tsreaper commented on code in PR #1594:
URL: https://github.com/apache/incubator-paimon/pull/1594#discussion_r1267471438
##########
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java:
##########
@@ -69,6 +70,87 @@ public static List<ReusingTestData>
getExpectedForDeduplicate(List<ReusingTestDa
return expected;
}
+ public static List<ReusingTestData>
getExpectedForPartialUpdate(List<ReusingTestData> input) {
+ input = new ArrayList<>(input);
+ Collections.sort(input);
+
+ List<ReusingTestData> expected = new ArrayList<>();
+ List<ReusingTestData> current = new ArrayList<>();
+ for (ReusingTestData data : input) {
+ while (true) {
+ if (current.isEmpty() || data.key ==
current.get(current.size() - 1).key) {
+ current.add(data);
+ break;
+ } else {
+ mergePartialUpdate(current).ifPresent(expected::add);
+ }
+ }
+ }
+ mergePartialUpdate(current).ifPresent(expected::add);
+ return expected;
+ }
+
+ private static Optional<ReusingTestData>
mergePartialUpdate(List<ReusingTestData> records) {
+ try {
+ if (records.size() == 1) {
+ return Optional.of(records.get(0));
+ } else {
+ for (int i = records.size() - 1; i >= 0; i--) {
+ if (records.get(i).valueKind.isAdd()) {
+ return Optional.of(records.get(i));
+ }
+ }
+ }
+ } finally {
+ records.clear();
+ }
+ return Optional.empty();
+ }
Review Comment:
```suggestion
public static List<ReusingTestData>
getExpectedForPartialUpdate(List<ReusingTestData> input) {
input = new ArrayList<>(input);
Collections.sort(input);
LinkedHashMap<Integer, List<ReusingTestData>> groups = new
LinkedHashMap<>();
for (ReusingTestData d : input) {
groups.computeIfAbsent(d.key, k -> new ArrayList<>()).add(d);
}
List<ReusingTestData> expected = new ArrayList<>();
for (List<ReusingTestData> group : groups.values()) {
if (group.size() == 1) {
// due to ReducerMergeFunctionWrapper
expected.add(group.get(0));
} else {
group.stream()
.filter(d -> d.valueKind.isAdd())
.reduce((first, second) -> second)
.ifPresent(expected::add);
}
}
return expected;
}
```
##########
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java:
##########
@@ -69,6 +70,87 @@ public static List<ReusingTestData>
getExpectedForDeduplicate(List<ReusingTestDa
return expected;
}
+ public static List<ReusingTestData>
getExpectedForPartialUpdate(List<ReusingTestData> input) {
+ input = new ArrayList<>(input);
+ Collections.sort(input);
+
+ List<ReusingTestData> expected = new ArrayList<>();
+ List<ReusingTestData> current = new ArrayList<>();
+ for (ReusingTestData data : input) {
+ while (true) {
+ if (current.isEmpty() || data.key ==
current.get(current.size() - 1).key) {
+ current.add(data);
+ break;
+ } else {
+ mergePartialUpdate(current).ifPresent(expected::add);
+ }
+ }
+ }
+ mergePartialUpdate(current).ifPresent(expected::add);
+ return expected;
+ }
+
+ private static Optional<ReusingTestData>
mergePartialUpdate(List<ReusingTestData> records) {
+ try {
+ if (records.size() == 1) {
+ return Optional.of(records.get(0));
+ } else {
+ for (int i = records.size() - 1; i >= 0; i--) {
+ if (records.get(i).valueKind.isAdd()) {
+ return Optional.of(records.get(i));
+ }
+ }
+ }
+ } finally {
+ records.clear();
+ }
+ return Optional.empty();
+ }
+
+ public static List<ReusingTestData>
getExpectedForAgg(List<ReusingTestData> input) {
+ input = new ArrayList<>(input);
+ Collections.sort(input);
+
+ List<ReusingTestData> expected = new ArrayList<>();
+ List<ReusingTestData> current = new ArrayList<>();
+ for (ReusingTestData data : input) {
+ while (true) {
+ if (current.isEmpty() || data.key ==
current.get(current.size() - 1).key) {
+ current.add(data);
+ break;
+ } else {
+ expected.add(mergeAgg(current));
+ }
+ }
+ }
+ expected.add(mergeAgg(current));
+ return expected;
+ }
+
+ private static ReusingTestData mergeAgg(List<ReusingTestData> records) {
+ try {
+ if (records.size() == 1) {
+ return records.get(0);
+ } else {
+ ReusingTestData lastOne = records.get(records.size() - 1);
+ ReusingTestData data =
+ new ReusingTestData(lastOne.key,
lastOne.sequenceNumber, RowKind.INSERT, 0);
+ for (int i = records.size() - 1; i >= 0; i--) {
+ ReusingTestData current = records.get(i);
+ data =
+ data.copy(
+ data.value
+ + (current.valueKind.isAdd()
+ ? current.value
+ : -current.value));
+ }
+ return data;
+ }
+ } finally {
+ records.clear();
+ }
+ }
Review Comment:
```suggestion
public static List<ReusingTestData> getExpectedForAgg(List<ReusingTestData>
input) {
input = new ArrayList<>(input);
Collections.sort(input);
LinkedHashMap<Integer, List<ReusingTestData>> groups = new
LinkedHashMap<>();
for (ReusingTestData d : input) {
groups.computeIfAbsent(d.key, k -> new ArrayList<>()).add(d);
}
List<ReusingTestData> expected = new ArrayList<>();
for (List<ReusingTestData> group : groups.values()) {
long sum =
group.stream().mapToLong(d -> d.valueKind.isAdd() ?
d.value : -d.value).sum();
ReusingTestData last = group.get(group.size() - 1);
expected.add(new ReusingTestData(last.key, last.sequenceNumber,
RowKind.INSERT, sum));
}
return expected;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]