xushiyan commented on a change in pull request #2210: URL: https://github.com/apache/hudi/pull/2210#discussion_r734982758
########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP; + +/** + * This class provides various clean-up strategies for DeltaStreamer when reading from DFS file sources. + * Each <code>*DFSSource</code> may invoke this to clean up/archive files after each successful commit. + * + */ +public abstract class FileSourceCleaner { + private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class); + + /** + * Configs supported. + */ + public static class Config { + private Config() {} + + public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode"; + public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name(); + + public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads"; + public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1; + + public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir"; + } + + private enum CleanMode { + /** + * Remove source files after each successfully commit. + */ + DELETE, + /** + * Move source files to specified archive directory after each successful commit. + * Used in conjunction with <code>hoodie.deltastreamer.source.dfs.clean.archiveDir</code> + */ + ARCHIVE, + /** + * Default option. Do not clean up source files. + */ + OFF + } Review comment: @hotienvu @nsivabalan @vinothchandar I have a different view on supporting these features: we need to be extremely cautious when dealing with users' data. So I would imagine it's the worst case when an unintended config in Hudi could lead to deleting users' data. ########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java ########## @@ -79,6 +79,15 @@ protected Source(TypedProperties props, JavaSparkContext sparkContext, SparkSess : new InputBatch<>(batch.getBatch(), batch.getCheckpointForNextBatch(), overriddenSchemaProvider); } + /** + * Called after a new batch is committed successfully. Can be used to clean up source if necessary. + * + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public void postCommit() { + // no-op + } + Review comment: big +1 to this API, definitely comes in handy. ########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/FileSourceCleaner.java ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP; + +/** + * This class provides various clean-up strategies for DeltaStreamer when reading from DFS file sources. + * Each <code>*DFSSource</code> may invoke this to clean up/archive files after each successful commit. + * + */ +public abstract class FileSourceCleaner { + private static final Logger LOG = LogManager.getLogger(FileSourceCleaner.class); + + /** + * Configs supported. + */ + public static class Config { + private Config() {} + + public static final String FILE_SOURCE_CLEAN_MODE_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.mode"; + public static final String DEFAULT_FILE_SOURCE_CLEAN_MODE_OPT_KEY = CleanMode.OFF.name(); + + public static final String FILE_SOURCE_CLEAN_NUM_THREADS_OPT_KEY = "hoodie.deltastreamer.source.dfs.clean.numThreads"; + public static final int DEFAULT_FILE_SOURCE_CLEAN_NUM_THREADS_OPT_VAL = 1; + + public static final String FILE_SOURCE_ARCHIVE_DIR_KEY = "hoodie.deltastreamer.source.dfs.clean.archiveDir"; + } + + private enum CleanMode { + /** + * Remove source files after each successfully commit. + */ + DELETE, + /** + * Move source files to specified archive directory after each successful commit. + * Used in conjunction with <code>hoodie.deltastreamer.source.dfs.clean.archiveDir</code> + */ + ARCHIVE, + /** + * Default option. Do not clean up source files. + */ + OFF + } Review comment: I do think the postCommit() API is very useful. I'd suggest to provide an abstract class say `SourcePostCommitAction` to be extended by users. In hudi we may just support default no-op implementation. Users are free to implement clean-up, archive, posting metrics, etc. We just need to instantiate user-defined classes via hoodie config, like Transformer or Payload classes. I also think defining a bunch of metrics for DFSSource is helpful, like num of files picked up, total size of files, file scanning duration, etc. To provide visibility to users when dealing with DFSSource impl. issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
