codope commented on a change in pull request #3648:
URL: https://github.com/apache/hudi/pull/3648#discussion_r716397052
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
##########
@@ -135,6 +135,23 @@ public void testSqlSourceRowFormat() throws IOException {
assertEquals(10000, fetch1AsRows.getBatch().get().count());
}
+ /**
+ * Runs the test scenario of reading data from the source in row format.
+ * Source has no records.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testSqlSourceCheckpoint() throws IOException {
+ props.setProperty(sqlSourceConfig, "select * from test_sql_table where
1=0");
+ sqlSource = new SqlSource(props, jsc, sparkSession, schemaProvider);
+ sourceFormatAdapter = new SourceFormatAdapter(sqlSource);
+
+ InputBatch<Dataset<Row>> fetch1AsRows =
+ sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(),
Long.MAX_VALUE);
+ assertEquals("", fetch1AsRows.getCheckpointForNextBatch());
Review comment:
We can add this assertion to existing `testSqlSourceZeroRecord` instead
of adding a new test. Also, let's assert static constant EMPTY_STRING.
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -336,11 +337,11 @@ public void refreshTimeline() throws IOException {
//if previous checkpoint is an empty string, skip resume use
Option.empty()
resumeCheckpointStr =
Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else if (commitMetadata.getOperationType() ==
WriteOperationType.CLUSTER) {
- // incase of CLUSTER commit, no checkpoint will be available in
metadata.
+ // in case of CLUSTER commit or sql source, no checkpoint will be
available in metadata.
resumeCheckpointStr = Option.empty();
} else if
(HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
- throw new HoodieDeltaStreamerException(
+ LOG.warn(
Review comment:
This changes the behavior for all the sources. In some cases it might
make sense to throw an exception instead of silently failing. Do you think it
would be better to introduce a config
`hoodie.deltastreamer.fail.on.missing.checkpoint` which is by default true? For
SqlSource, users can set to false.
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
##########
@@ -135,6 +135,23 @@ public void testSqlSourceRowFormat() throws IOException {
assertEquals(10000, fetch1AsRows.getBatch().get().count());
}
+ /**
+ * Runs the test scenario of reading data from the source in row format.
+ * Source has no records.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testSqlSourceCheckpoint() throws IOException {
Review comment:
It would be good to have a test for SqlSource in TestHoodieDeltaStreamer
as well. You can take it up as a follow-up taks if you like. Typically, for
every source we want to have a unit test and a deltastreamer test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]