lokeshj1703 commented on code in PR #9538:
URL: https://github.com/apache/hudi/pull/9538#discussion_r1319974133
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java:
##########
@@ -135,10 +128,10 @@ public void
shouldNotFindNewDataIfCommitTimeOfWriteAndReadAreEqual() throws IOEx
Pair<String, List<HoodieRecord>> inserts =
writeGcsMetadataRecords(commitTimeForWrites);
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, 0, inserts.getKey());
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, inserts.getKey());
verify(gcsObjectMetadataFetcher,
times(0)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
Review Comment:
Since we are moving away from mocked `GcsObjectMetadataFetcher`. Can we
remove the field `gcsObjectMetadataFetcher`?
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java:
##########
@@ -261,45 +212,28 @@ public void testTwoFilesAndContinueAcrossCommits() throws
IOException {
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);
- when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(),
eq(cloudObjectMetadataList), Mockito.any(),
- eq(schemaProvider))).thenReturn(Option.of(rows));
when(queryRunner.run(Mockito.any())).thenReturn(inputDs);
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, 4, "1#path/to/file1.json");
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"),
100L, 4, "1#path/to/file2.json");
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
1000L, 4, "2#path/to/file5.json");
-
- verify(gcsObjectMetadataFetcher,
times(3)).getGcsObjectMetadata(Mockito.any(), Mockito.any(),
- anyBoolean());
- verify(gcsObjectDataFetcher, times(3)).getCloudObjectDataDF(Mockito.any(),
- eq(cloudObjectMetadataList), Mockito.any(), eq(schemaProvider));
-
- schemaProvider = Option.empty();
- when(gcsObjectDataFetcher.getCloudObjectDataDF(Mockito.any(),
eq(cloudObjectMetadataList), Mockito.any(),
- eq(schemaProvider))).thenReturn(Option.of(rows));
- readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, 4, "1#path/to/file1.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, "1#path/to/file1.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file1.json"),
100L, "1#path/to/file2.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file2.json"),
1000L, "2#path/to/file5.json");
+ readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads),
100L, "1#path/to/file1.json");
}
private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
- Option<String> checkpointToPull, long
sourceLimit, int expectedCount, String expectedCheckpoint) {
+ Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint) {
TypedProperties typedProperties = setProps(missingCheckpointStrategy);
+ typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format",
"json");
GcsEventsHoodieIncrSource incrSource = new
GcsEventsHoodieIncrSource(typedProperties, jsc(),
- spark(), schemaProvider.orElse(null), gcsObjectMetadataFetcher,
gcsObjectDataFetcher, queryRunner);
+ spark(), schemaProvider.orElse(null), new
GcsObjectMetadataFetcher(typedProperties, "json"), gcsObjectDataFetcher,
queryRunner);
Pair<Option<Dataset<Row>>, String> dataAndCheckpoint =
incrSource.fetchNextBatch(checkpointToPull, sourceLimit);
Option<Dataset<Row>> datasetOpt = dataAndCheckpoint.getLeft();
String nextCheckPoint = dataAndCheckpoint.getRight();
Assertions.assertNotNull(nextCheckPoint);
-
- if (expectedCount == 0) {
- assertFalse(datasetOpt.isPresent());
- } else {
- assertEquals(datasetOpt.get().count(), expectedCount);
- }
-
Review Comment:
Can we still keep these assertions since the dataset would still return the
records?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]