This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1ca912a [HUDI-667] Fixing delete tests for DeltaStreamer (#1395)
1ca912a is described below
commit 1ca912af0904283a270a822d5876babca5c89739
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Mar 11 16:19:23 2020 -0700
[HUDI-667] Fixing delete tests for DeltaStreamer (#1395)
---
.../hudi/common/HoodieTestDataGenerator.java | 42 +++++++++++-----------
.../hudi/utilities/TestHoodieDeltaStreamer.java | 4 +--
2 files changed, 23 insertions(+), 23 deletions(-)
diff --git
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index e0d2a53..6d86e93 100644
---
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -399,7 +399,6 @@ public class HoodieTestDataGenerator {
*/
public Stream<HoodieRecord> generateUniqueUpdatesStream(String commitTime,
Integer n) {
final Set<KeyPartition> used = new HashSet<>();
-
if (n > numExistingKeys) {
throw new IllegalArgumentException("Requested unique updates is greater
than number of available keys");
}
@@ -429,24 +428,24 @@ public class HoodieTestDataGenerator {
*/
public Stream<HoodieKey> generateUniqueDeleteStream(Integer n) {
final Set<KeyPartition> used = new HashSet<>();
-
if (n > numExistingKeys) {
throw new IllegalArgumentException("Requested unique deletes is greater
than number of available keys");
}
- return IntStream.range(0, n).boxed().map(i -> {
- int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1);
- KeyPartition kp = existingKeys.get(index);
- // Find the available keyPartition starting from randomly chosen one.
- while (used.contains(kp)) {
+ List<HoodieKey> result = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ int index = RAND.nextInt(numExistingKeys);
+ while (!existingKeys.containsKey(index)) {
index = (index + 1) % numExistingKeys;
- kp = existingKeys.get(index);
}
- existingKeys.remove(kp);
+ KeyPartition kp = existingKeys.remove(index);
+ existingKeys.put(index, existingKeys.get(numExistingKeys - 1));
+ existingKeys.remove(numExistingKeys - 1);
numExistingKeys--;
used.add(kp);
- return kp.key;
- });
+ result.add(kp.key);
+ }
+ return result.stream();
}
/**
@@ -458,28 +457,29 @@ public class HoodieTestDataGenerator {
*/
public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String
commitTime, Integer n) {
final Set<KeyPartition> used = new HashSet<>();
-
if (n > numExistingKeys) {
throw new IllegalArgumentException("Requested unique deletes is greater
than number of available keys");
}
- return IntStream.range(0, n).boxed().map(i -> {
- int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1);
- KeyPartition kp = existingKeys.get(index);
- // Find the available keyPartition starting from randomly chosen one.
- while (used.contains(kp)) {
+ List<HoodieRecord> result = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ int index = RAND.nextInt(numExistingKeys);
+ while (!existingKeys.containsKey(index)) {
index = (index + 1) % numExistingKeys;
- kp = existingKeys.get(index);
}
- existingKeys.remove(kp);
+ // swap chosen index with last index and remove last entry.
+ KeyPartition kp = existingKeys.remove(index);
+ existingKeys.put(index, existingKeys.get(numExistingKeys - 1));
+ existingKeys.remove(numExistingKeys - 1);
numExistingKeys--;
used.add(kp);
try {
- return new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key,
commitTime));
+ result.add(new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key,
commitTime)));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
- });
+ }
+ return result.stream();
}
public String[] getPartitionPaths() {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index 9d324dc..100faa2 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -422,8 +422,8 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
} else {
TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, dfs);
}
- TestHelpers.assertRecordCount(totalRecords + 200, tableBasePath +
"/*/*.parquet", sqlContext);
- TestHelpers.assertDistanceCount(totalRecords + 200, tableBasePath +
"/*/*.parquet", sqlContext);
+ TestHelpers.assertRecordCount(totalRecords, tableBasePath +
"/*/*.parquet", sqlContext);
+ TestHelpers.assertDistanceCount(totalRecords, tableBasePath +
"/*/*.parquet", sqlContext);
return true;
}, 180);
ds.shutdownGracefully();