nsivabalan commented on a change in pull request #2210:
URL: https://github.com/apache/hudi/pull/2210#discussion_r579798666



##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -1159,6 +1163,72 @@ public void 
testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Ex
     testCsvDFSSource(false, '\t', true, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
+  @Test
+  public void testDFSSourceDeleteFilesAfterCommit() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "delete");
+    testDFSSourceCleanUp(props);
+  }
+
+  @Test
+  public void testDFSSourceArchiveFilesAfterCommit() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "archive");

Review comment:
       Can you please add a test for no-op as well. just to ensure with this 
patch, it is in fact no-op :) 

##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -217,7 +217,7 @@ public static void initClass() throws Exception {
     
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
     UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, 
dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
 
-    prepareParquetDFSFiles(PARQUET_NUM_RECORDS);
+    prepareParquetDFSFiles(PARQUET_SOURCE_ROOT + "/1.parquet", 
PARQUET_NUM_RECORDS);

Review comment:
       you can make the sourceRoot along as an argument. 1.parquet can be left 
as abstracted within the method.

##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
##########
@@ -189,4 +205,63 @@ public void testReadingFromSource() throws IOException {
         Option.empty(), Long.MAX_VALUE);
     assertEquals(10101, fetch6.getBatch().get().count());
   }
+
+  @Test
+  public void testCleanUpSourceAfterCommit() throws IOException {

Review comment:
       Not sure why we have tests in this class. These classes in testutils are 
utility classes to assist in testing and should not have any individual tests 
as such. Can you move tests from this class to a separate test class. If we 
don't hold on this, slowly it will invite everyone to add more tests to this 
class. 

##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
##########
@@ -79,14 +85,24 @@ public void setup() throws Exception {
   @AfterEach
   public void teardown() throws Exception {
     super.teardown();
+    dfs.delete(new Path(dfsRoot), true);
+    dfs.delete(new Path(dfsArchivePath), true);
   }
 
   /**
    * Prepares the specific {@link Source} to test, by passing in necessary 
configurations.
    *
    * @return A {@link Source} using DFS as the file system.
+   * @param defaults
    */
-  protected abstract Source prepareDFSSource();
+  protected abstract Source prepareDFSSource(TypedProperties defaults);
+
+  /**
+   * Prepares the specific {@link Source} to test.
+   */
+  protected Source prepareDFSSource() {
+    return prepareDFSSource(new TypedProperties());

Review comment:
       if we follow the proposed changes in tests, we don't need to make any 
changes to these tests.

##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -1159,6 +1163,72 @@ public void 
testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Ex
     testCsvDFSSource(false, '\t', true, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
+  @Test
+  public void testDFSSourceDeleteFilesAfterCommit() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "delete");
+    testDFSSourceCleanUp(props);
+  }
+
+  @Test
+  public void testDFSSourceArchiveFilesAfterCommit() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "archive");
+    final String archivePath = dfsBasePath + "/archive";
+    dfs.mkdirs(new Path(archivePath));
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.archiveDir", 
archivePath);
+    assertEquals(0, Helpers.listAllFiles(archivePath).size());
+    testDFSSourceCleanUp(props);
+    // archive dir should contain source files
+    assertEquals(1, Helpers.listAllFiles(archivePath).size());
+  }
+
+  private void testDFSSourceCleanUp(TypedProperties props) throws Exception {
+    // since each source cleanup test will modify source, we need to set 
different dfs.root each test
+    final String dfsRoot = dfsBasePath + "/test_dfs_cleanup_source" + testNum;
+    dfs.mkdirs(new Path(dfsRoot));
+    prepareParquetDFSFiles(dfsRoot + "/1.parquet", PARQUET_NUM_RECORDS);
+    props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+    prepareParquetDFSSource(false, false, props);

Review comment:
       can we just move this config within prepareParquetDFSSource() itself. so 
that we don't need to make changes to prepareDFSSource signature. 

##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
##########
@@ -189,4 +205,63 @@ public void testReadingFromSource() throws IOException {
         Option.empty(), Long.MAX_VALUE);
     assertEquals(10101, fetch6.getBatch().get().count());
   }
+
+  @Test
+  public void testCleanUpSourceAfterCommit() throws IOException {
+    dfs.mkdirs(new Path(dfsRoot));
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "delete");
+    // use synchronous clean to be able to verify immediately
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.numThreads", "0");
+    SourceFormatAdapter sourceFormatAdapter = new 
SourceFormatAdapter(prepareDFSSource(props));
+    assertEquals(Option.empty(),
+        sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 
Long.MAX_VALUE).getBatch());

Review comment:
       you can move the lines from 211 to 218 to a separate private method and 
reuse in both tests. So that main test looks lean and easy to read. 
   Also, add no-op tests as well. 

##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -1159,6 +1163,72 @@ public void 
testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Ex
     testCsvDFSSource(false, '\t', true, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
   }
 
+  @Test
+  public void testDFSSourceDeleteFilesAfterCommit() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "delete");
+    testDFSSourceCleanUp(props);
+  }
+
+  @Test
+  public void testDFSSourceArchiveFilesAfterCommit() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.mode", "archive");
+    final String archivePath = dfsBasePath + "/archive";
+    dfs.mkdirs(new Path(archivePath));
+    props.setProperty("hoodie.deltastreamer.source.dfs.clean.archiveDir", 
archivePath);
+    assertEquals(0, Helpers.listAllFiles(archivePath).size());
+    testDFSSourceCleanUp(props);
+    // archive dir should contain source files
+    assertEquals(1, Helpers.listAllFiles(archivePath).size());
+  }
+
+  private void testDFSSourceCleanUp(TypedProperties props) throws Exception {
+    // since each source cleanup test will modify source, we need to set 
different dfs.root each test
+    final String dfsRoot = dfsBasePath + "/test_dfs_cleanup_source" + testNum;
+    dfs.mkdirs(new Path(dfsRoot));
+    prepareParquetDFSFiles(dfsRoot + "/1.parquet", PARQUET_NUM_RECORDS);
+    props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+    prepareParquetDFSSource(false, false, props);
+
+    String tableBasePath = dfsBasePath + "/test_dfs_cleanup_output" + testNum;

Review comment:
       Can we use diff path for table and dfsRoot. looks like you use the same. 
just to avoid confusion.

##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -965,14 +965,13 @@ public void testDistributedTestDataSource() {
     assertEquals(1000, c);
   }
 
-  private static void prepareParquetDFSFiles(int numRecords) throws 
IOException {
-    String path = PARQUET_SOURCE_ROOT + "/1.parquet";
+  private static void prepareParquetDFSFiles(String path, int numRecords) 
throws IOException {
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     Helpers.saveParquetToDFS(Helpers.toGenericRecords(
         dataGenerator.generateInserts("000", numRecords)), new Path(path));
   }
 
-  private void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer) throws IOException {
+  private void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer, TypedProperties props) throws IOException {

Review comment:
       may be you can name the new arg as additionalProps.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to