This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 1ca912a [HUDI-667] Fixing delete tests for DeltaStreamer (#1395) 1ca912a is described below commit 1ca912af0904283a270a822d5876babca5c89739 Author: Sivabalan Narayanan <sivab...@uber.com> AuthorDate: Wed Mar 11 16:19:23 2020 -0700 [HUDI-667] Fixing delete tests for DeltaStreamer (#1395) --- .../hudi/common/HoodieTestDataGenerator.java | 42 +++++++++++----------- .../hudi/utilities/TestHoodieDeltaStreamer.java | 4 +-- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index e0d2a53..6d86e93 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -399,7 +399,6 @@ public class HoodieTestDataGenerator { */ public Stream<HoodieRecord> generateUniqueUpdatesStream(String commitTime, Integer n) { final Set<KeyPartition> used = new HashSet<>(); - if (n > numExistingKeys) { throw new IllegalArgumentException("Requested unique updates is greater than number of available keys"); } @@ -429,24 +428,24 @@ public class HoodieTestDataGenerator { */ public Stream<HoodieKey> generateUniqueDeleteStream(Integer n) { final Set<KeyPartition> used = new HashSet<>(); - if (n > numExistingKeys) { throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys"); } - return IntStream.range(0, n).boxed().map(i -> { - int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1); - KeyPartition kp = existingKeys.get(index); - // Find the available keyPartition starting from randomly chosen one. - while (used.contains(kp)) { + List<HoodieKey> result = new ArrayList<>(); + for (int i = 0; i < n; i++) { + int index = RAND.nextInt(numExistingKeys); + while (!existingKeys.containsKey(index)) { index = (index + 1) % numExistingKeys; - kp = existingKeys.get(index); } - existingKeys.remove(kp); + KeyPartition kp = existingKeys.remove(index); + existingKeys.put(index, existingKeys.get(numExistingKeys - 1)); + existingKeys.remove(numExistingKeys - 1); numExistingKeys--; used.add(kp); - return kp.key; - }); + result.add(kp.key); + } + return result.stream(); } /** @@ -458,28 +457,29 @@ public class HoodieTestDataGenerator { */ public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String commitTime, Integer n) { final Set<KeyPartition> used = new HashSet<>(); - if (n > numExistingKeys) { throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys"); } - return IntStream.range(0, n).boxed().map(i -> { - int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1); - KeyPartition kp = existingKeys.get(index); - // Find the available keyPartition starting from randomly chosen one. - while (used.contains(kp)) { + List<HoodieRecord> result = new ArrayList<>(); + for (int i = 0; i < n; i++) { + int index = RAND.nextInt(numExistingKeys); + while (!existingKeys.containsKey(index)) { index = (index + 1) % numExistingKeys; - kp = existingKeys.get(index); } - existingKeys.remove(kp); + // swap chosen index with last index and remove last entry. + KeyPartition kp = existingKeys.remove(index); + existingKeys.put(index, existingKeys.get(numExistingKeys - 1)); + existingKeys.remove(numExistingKeys - 1); numExistingKeys--; used.add(kp); try { - return new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, commitTime)); + result.add(new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, commitTime))); } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } - }); + } + return result.stream(); } public String[] getPartitionPaths() { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index 9d324dc..100faa2 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -422,8 +422,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { } else { TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, dfs); } - TestHelpers.assertRecordCount(totalRecords + 200, tableBasePath + "/*/*.parquet", sqlContext); - TestHelpers.assertDistanceCount(totalRecords + 200, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext); return true; }, 180); ds.shutdownGracefully();