This is an automated email from the ASF dual-hosted git repository. garyli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 0c4f2fd [HUDI-1984] Support independent flink hudi compaction function (#3046) 0c4f2fd is described below commit 0c4f2fdc15982870b59d3458303973af95984266 Author: swuferhong <337361...@qq.com> AuthorDate: Sun Jun 13 15:04:46 2021 +0800 [HUDI-1984] Support independent flink hudi compaction function (#3046) --- .../FlinkScheduleCompactionActionExecutor.java | 44 +++++++ .../apache/hudi/configuration/FlinkOptions.java | 35 ----- .../java/org/apache/hudi/sink/CleanFunction.java | 2 +- .../apache/hudi/sink/compact/CompactFunction.java | 29 ++-- .../hudi/sink/compact/CompactionCommitSink.java | 18 ++- .../sink/compact/CompactionPlanSourceFunction.java | 111 ++++++++++++++++ .../hudi/sink/compact/FlinkCompactionConfig.java | 107 +++++++++++++++ .../hudi/sink/compact/HoodieFlinkCompactor.java | 146 +++++++++++++++++++++ ...tFunction.java => NonKeyedCompactFunction.java} | 44 +++---- .../apache/hudi/streamer/FlinkStreamerConfig.java | 37 +++++- .../apache/hudi/streamer/HoodieFlinkStreamer.java | 6 +- .../java/org/apache/hudi/util/CompactionUtil.java | 86 ++++++++++++ .../java/org/apache/hudi/util/StreamerUtil.java | 12 +- .../org/apache/hudi/sink/StreamWriteITCase.java | 100 +++++++++++++- 14 files changed, 692 insertions(+), 85 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java index 7db2dc8..d18cac2 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/FlinkScheduleCompactionActionExecutor.java @@ -28,12 +28,16 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCompactionException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -50,12 +54,15 @@ public class FlinkScheduleCompactionActionExecutor<T extends HoodieRecordPayload private static final Logger LOG = LogManager.getLogger(FlinkScheduleCompactionActionExecutor.class); + private final Option<Map<String, String>> extraMetadata; + public FlinkScheduleCompactionActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String instantTime, Option<Map<String, String>> extraMetadata) { super(context, config, table, instantTime, extraMetadata); + this.extraMetadata = extraMetadata; } @Override @@ -149,4 +156,41 @@ public class FlinkScheduleCompactionActionExecutor<T extends HoodieRecordPayload } return timestamp; } + + @Override + public Option<HoodieCompactionPlan> execute() { + if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() + && !config.getFailedWritesCleanPolicy().isLazy()) { + // if there are inflight writes, their instantTime must not be less than that of compaction instant time + table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant() + .ifPresent(earliestInflight -> ValidationUtils.checkArgument( + HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime), + "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight + + ", Compaction scheduled at " + instantTime)); + // Committed and pending compaction instants should have strictly lower timestamps + List<HoodieInstant> conflictingInstants = table.getActiveTimeline() + .getWriteTimeline().filterCompletedAndCompactionInstants().getInstants() + .filter(instant -> HoodieTimeline.compareTimestamps( + instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime)) + .collect(Collectors.toList()); + ValidationUtils.checkArgument(conflictingInstants.isEmpty(), + "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :" + + conflictingInstants); + } + + HoodieCompactionPlan plan = scheduleCompaction(); + if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) { + extraMetadata.ifPresent(plan::setExtraMetadata); + HoodieInstant compactionInstant = + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instantTime); + try { + table.getActiveTimeline().saveToCompactionRequested(compactionInstant, + TimelineMetadataUtils.serializeCompactionPlan(plan)); + } catch (IOException ioe) { + throw new HoodieIOException("Exception scheduling compaction", ioe); + } + return Option.of(plan); + } + return Option.empty(); + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index d407697..b14ef6e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -25,8 +25,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorType; -import org.apache.hudi.streamer.FlinkStreamerConfig; -import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -521,39 +519,6 @@ public class FlinkOptions { private static final String PROPERTIES_PREFIX = "properties."; /** - * Transforms a {@code HoodieFlinkStreamer.Config} into {@code Configuration}. - * The latter is more suitable for the table APIs. It reads all the properties - * in the properties file (set by `--props` option) and cmd line options - * (set by `--hoodie-conf` option). - */ - @SuppressWarnings("unchecked, rawtypes") - public static org.apache.flink.configuration.Configuration fromStreamerConfig(FlinkStreamerConfig config) { - Map<String, String> propsMap = new HashMap<String, String>((Map) StreamerUtil.getProps(config)); - org.apache.flink.configuration.Configuration conf = fromMap(propsMap); - - conf.setString(FlinkOptions.PATH, config.targetBasePath); - conf.setString(READ_AVRO_SCHEMA_PATH, config.readSchemaFilePath); - conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName); - // copy_on_write works same as COPY_ON_WRITE - conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase()); - conf.setString(FlinkOptions.OPERATION, config.operation.value()); - conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); - conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName); - conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes); - conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes)); - conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval)); - conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors); - conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField); - conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField); - // keygenClass has higher priority than keygenType - conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType); - conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass); - conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum); - - return conf; - } - - /** * Collects the config options that start with 'properties.' into a 'key'='value' list. */ public static Map<String, String> getHoodieProperties(Map<String, String> options) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 14dd827..1ca593f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -46,7 +46,7 @@ public class CleanFunction<T> extends AbstractRichFunction private final Configuration conf; - private HoodieFlinkWriteClient writeClient; + protected HoodieFlinkWriteClient writeClient; private NonThrownExecutor executor; private volatile boolean isCleaning; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index ee8678b..13a9f45 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -33,6 +33,7 @@ import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.List; /** @@ -79,22 +80,24 @@ public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEv final CompactionOperation compactionOperation = event.getOperation(); // executes the compaction task asynchronously to not block the checkpoint barrier propagate. executor.execute( - () -> { - HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); - List<WriteStatus> writeStatuses = compactor.compact( - new HoodieFlinkCopyOnWriteTable<>( - this.writeClient.getConfig(), - this.writeClient.getEngineContext(), - this.writeClient.getHoodieTable().getMetaClient()), - this.writeClient.getHoodieTable().getMetaClient(), - this.writeClient.getConfig(), - compactionOperation, - instantTime); - collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID)); - }, "Execute compaction for instant %s from task %d", instantTime, taskID + () -> doCompaction(instantTime, compactionOperation, collector), "Execute compaction for instant %s from task %d", instantTime, taskID ); } + private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException { + HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); + List<WriteStatus> writeStatuses = compactor.compact( + new HoodieFlinkCopyOnWriteTable<>( + this.writeClient.getConfig(), + this.writeClient.getEngineContext(), + this.writeClient.getHoodieTable().getMetaClient()), + this.writeClient.getHoodieTable().getMetaClient(), + this.writeClient.getConfig(), + compactionOperation, + instantTime); + collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID)); + } + @VisibleForTesting public void setExecutor(NonThrownExecutor executor) { this.executor = executor; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index 41831cd..0884342 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -19,12 +19,12 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.util.StreamerUtil; @@ -60,11 +60,6 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { private final Configuration conf; /** - * Write Client. - */ - private transient HoodieFlinkWriteClient writeClient; - - /** * Buffer to collect the event from each compact task {@code CompactFunction}. * The key is the instant time. */ @@ -78,7 +73,9 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + if (writeClient == null) { + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + } this.commitBuffer = new HashMap<>(); } @@ -122,6 +119,13 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> { } // commit the compaction this.writeClient.commitCompaction(instant, statuses, Option.empty()); + + // Whether to cleanup the old log file when compaction + if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { + this.writeClient.startAsyncCleaning(); + this.writeClient.waitForCleaningFinish(); + } + // reset the status reset(instant); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java new file mode 100644 index 0000000..5d5a008 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanSourceFunction.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.table.HoodieFlinkTable; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +import static java.util.stream.Collectors.toList; + +/** + * Flink hudi compaction source function. + * + * <P>This function read the compaction plan as {@link CompactionOperation}s then assign the compaction task + * event {@link CompactionPlanEvent} to downstream operators. + * + * <p>The compaction instant time is specified explicitly with strategies: + * + * <ul> + * <li>If the timeline has no inflight instants, + * use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()} + * as the instant time;</li> + * <li>If the timeline has inflight instants, + * use the {earliest inflight instant time - 1ms} as the instant time.</li> + * </ul> + */ +public class CompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction<CompactionPlanEvent> { + + protected static final Logger LOG = LoggerFactory.getLogger(CompactionPlanSourceFunction.class); + + /** + * Compaction instant time. + */ + private String compactionInstantTime; + + /** + * Hoodie flink table. + */ + private HoodieFlinkTable<?> table; + + /** + * The compaction plan. + */ + private HoodieCompactionPlan compactionPlan; + + /** + * Hoodie instant. + */ + private HoodieInstant instant; + + public CompactionPlanSourceFunction(HoodieFlinkTable<?> table, HoodieInstant instant, HoodieCompactionPlan compactionPlan, String compactionInstantTime) { + this.table = table; + this.instant = instant; + this.compactionPlan = compactionPlan; + this.compactionInstantTime = compactionInstantTime; + } + + @Override + public void open(Configuration parameters) throws Exception { + // no operation + } + + @Override + public void run(SourceContext sourceContext) throws Exception { + // Mark instant as compaction inflight + table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); + table.getMetaClient().reloadActiveTimeline(); + + List<CompactionOperation> operations = this.compactionPlan.getOperations().stream() + .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); + LOG.info("CompactionPlanFunction compacting " + operations + " files"); + for (CompactionOperation operation : operations) { + sourceContext.collect(new CompactionPlanEvent(compactionInstantTime, operation)); + } + } + + @Override + public void close() throws Exception { + // no operation + } + + @Override + public void cancel() { + // no operation + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java new file mode 100644 index 0000000..8e0c671 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact; + +import org.apache.hudi.configuration.FlinkOptions; + +import com.beust.jcommander.Parameter; +import org.apache.flink.configuration.Configuration; + +/** + * Configurations for Hoodie Flink compaction. + */ +public class FlinkCompactionConfig extends Configuration { + + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + + // ------------------------------------------------------------------------ + // Hudi Write Options + // ------------------------------------------------------------------------ + + @Parameter(names = {"--path"}, description = "Base path for the target hoodie table.", required = true) + public String path; + + // ------------------------------------------------------------------------ + // Compaction Options + // ------------------------------------------------------------------------ + + public static final String NUM_COMMITS = "num_commits"; + public static final String TIME_ELAPSED = "time_elapsed"; + public static final String NUM_AND_TIME = "num_and_time"; + public static final String NUM_OR_TIME = "num_or_time"; + @Parameter(names = {"--compaction-trigger-strategy"}, + description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n" + + "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n" + + "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n" + + "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n" + + "Default is 'num_commits'", + required = false) + public String compactionTriggerStrategy = NUM_COMMITS; + + @Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 5 commits", required = false) + public Integer compactionDeltaCommits = 1; + + @Parameter(names = {"--compaction-delta-seconds"}, description = "Max delta seconds time needed to trigger compaction, default 1 hour", required = false) + public Integer compactionDeltaSeconds = 3600; + + @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default", required = false) + public Boolean cleanAsyncEnable = false; + + @Parameter(names = {"--clean-retain-commits"}, + description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n" + + "This also directly translates into how much you can incrementally pull on this table, default 10", + required = false) + public Integer cleanRetainCommits = 10; + + @Parameter(names = {"--archive-min-commits"}, + description = "Min number of commits to keep before archiving older commits into a sequential log, default 20.", + required = false) + public Integer archiveMinCommits = 20; + + @Parameter(names = {"--archive-max-commits"}, + description = "Max number of commits to keep before archiving older commits into a sequential log, default 30.", + required = false) + public Integer archiveMaxCommits = 30; + + @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB.", required = false) + public Integer compactionMaxMemory = 100; + + /** + * Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}. + * The latter is more suitable for the table APIs. It reads all the properties + * in the properties file (set by `--props` option) and cmd line options + * (set by `--hoodie-conf` option). + * */ + public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkCompactionConfig config) { + org.apache.flink.configuration.Configuration conf = new Configuration(); + + conf.setString(FlinkOptions.PATH, config.path); + conf.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, config.compactionTriggerStrategy); + conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, config.archiveMaxCommits); + conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, config.archiveMinCommits); + conf.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, config.cleanRetainCommits); + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, config.compactionDeltaCommits); + conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, config.compactionDeltaSeconds); + conf.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, config.compactionMaxMemory); + conf.setBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED, config.cleanAsyncEnable); + + return conf; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java new file mode 100644 index 0000000..57b5c06 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.StreamerUtil; + +import com.beust.jcommander.JCommander; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Flink hudi compaction program that can be executed manually. + */ +public class HoodieFlinkCompactor { + + protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class); + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + FlinkCompactionConfig cfg = new FlinkCompactionConfig(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + + Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); + + // create metaClient + HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf); + + // get the table name + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + + // set table schema + CompactionUtil.setAvroSchema(conf, metaClient); + + // judge whether have operation + // to compute the compaction instant time and do compaction. + String instantTime = CompactionUtil.getCompactionInstantTime(metaClient); + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); + writeClient.scheduleCompactionAtInstant(instantTime, Option.empty()); + + HoodieFlinkTable<?> table = writeClient.getHoodieTable(); + // the last instant takes the highest priority. + Option<HoodieInstant> compactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant(); + String compactionInstantTime = compactionInstant.isPresent() ? compactionInstant.get().getTimestamp() : null; + if (compactionInstantTime == null) { + // do nothing. + LOG.info("No compaction plan for this job "); + return; + } + // generate compaction plan + // should support configurable commit metadata + HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( + table.getMetaClient(), compactionInstantTime); + + if (compactionPlan == null || (compactionPlan.getOperations() == null) + || (compactionPlan.getOperations().isEmpty())) { + // No compaction plan, do nothing and return. + LOG.info("No compaction plan for this job and instant " + compactionInstantTime); + return; + } + + HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); + if (!pendingCompactionTimeline.containsInstant(instant)) { + // this means that the compaction plan was written to auxiliary path(.tmp) + // but not the meta path(.hoodie), this usually happens when the job crush + // exceptionally. + + // clean the compaction plan in auxiliary path and cancels the compaction. + + LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" + + "Clean the compaction plan in auxiliary path and cancels the compaction"); + cleanInstant(table.getMetaClient(), instant); + return; + } + + env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime)) + .name("compaction_source") + .uid("uid_compaction_source") + .rebalance() + .transform("compact_task", + TypeInformation.of(CompactionCommitEvent.class), + new ProcessOperator<>(new NonKeyedCompactFunction(conf))) + .setParallelism(compactionPlan.getOperations().size()) + .addSink(new CompactionCommitSink(conf)) + .name("clean_commits") + .uid("uid_clean_commits") + .setParallelism(1); + + env.execute("flink_hudi_compaction"); + } + + private static void cleanInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) { + Path commitFilePath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); + try { + if (metaClient.getFs().exists(commitFilePath)) { + boolean deleted = metaClient.getFs().delete(commitFilePath, false); + if (deleted) { + LOG.info("Removed instant " + instant); + } else { + throw new HoodieIOException("Could not delete instant " + instant); + } + } + } catch (IOException e) { + throw new HoodieIOException("Could not remove requested commit " + commitFilePath, e); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/NonKeyedCompactFunction.java similarity index 64% copy from hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java copy to hudi-flink/src/main/java/org/apache/hudi/sink/compact/NonKeyedCompactFunction.java index ee8678b..f1be78c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/NonKeyedCompactFunction.java @@ -28,19 +28,20 @@ import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.List; /** * Function to execute the actual compaction task assigned by the compaction plan task. - * In order to execute scalable, the input should shuffle by the compact event {@link CompactionPlanEvent}. + * The input compact event {@link CompactionPlanEvent}s were distributed evenly to this function. */ -public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEvent, CompactionCommitEvent> { - private static final Logger LOG = LoggerFactory.getLogger(CompactFunction.class); +public class NonKeyedCompactFunction extends ProcessFunction<CompactionPlanEvent, CompactionCommitEvent> { + private static final Logger LOG = LoggerFactory.getLogger(NonKeyedCompactFunction.class); /** * Config options. @@ -62,7 +63,7 @@ public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEv */ private transient NonThrownExecutor executor; - public CompactFunction(Configuration conf) { + public NonKeyedCompactFunction(Configuration conf) { this.conf = conf; } @@ -74,25 +75,24 @@ public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEv } @Override - public void processElement(CompactionPlanEvent event, Context context, Collector<CompactionCommitEvent> collector) throws Exception { + public void processElement(CompactionPlanEvent event, Context ctx, Collector<CompactionCommitEvent> collector) throws Exception { final String instantTime = event.getCompactionInstantTime(); final CompactionOperation compactionOperation = event.getOperation(); - // executes the compaction task asynchronously to not block the checkpoint barrier propagate. - executor.execute( - () -> { - HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); - List<WriteStatus> writeStatuses = compactor.compact( - new HoodieFlinkCopyOnWriteTable<>( - this.writeClient.getConfig(), - this.writeClient.getEngineContext(), - this.writeClient.getHoodieTable().getMetaClient()), - this.writeClient.getHoodieTable().getMetaClient(), - this.writeClient.getConfig(), - compactionOperation, - instantTime); - collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID)); - }, "Execute compaction for instant %s from task %d", instantTime, taskID - ); + doCompaction(instantTime, compactionOperation, collector); + } + + private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException { + HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); + List<WriteStatus> writeStatuses = compactor.compact( + new HoodieFlinkCopyOnWriteTable<>( + this.writeClient.getConfig(), + this.writeClient.getEngineContext(), + this.writeClient.getHoodieTable().getMetaClient()), + this.writeClient.getHoodieTable().getMetaClient(), + this.writeClient.getConfig(), + compactionOperation, + instantTime); + collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID)); } @VisibleForTesting diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 3b43a0d..843e9bf 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -21,13 +21,17 @@ package org.apache.hudi.streamer; import org.apache.hudi.client.utils.OperationConverter; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.keygen.constant.KeyGeneratorType; +import org.apache.hudi.util.StreamerUtil; import com.beust.jcommander.Parameter; import org.apache.flink.configuration.Configuration; -import org.apache.hudi.keygen.constant.KeyGeneratorType; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Configurations for Hoodie Flink streamer. @@ -124,4 +128,35 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks that do actual write, default is 4.") public Integer writeTaskNum = 4; + + /** + * Transforms a {@code HoodieFlinkStreamer.Config} into {@code Configuration}. + * The latter is more suitable for the table APIs. It reads all the properties + * in the properties file (set by `--props` option) and cmd line options + * (set by `--hoodie-conf` option). + */ + @SuppressWarnings("unchecked, rawtypes") + public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkStreamerConfig config) { + Map<String, String> propsMap = new HashMap<String, String>((Map) StreamerUtil.getProps(config)); + org.apache.flink.configuration.Configuration conf = fromMap(propsMap); + + conf.setString(FlinkOptions.PATH, config.targetBasePath); + conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, config.readSchemaFilePath); + conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName); + // copy_on_write works same as COPY_ON_WRITE + conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase()); + conf.setString(FlinkOptions.OPERATION, config.operation.value()); + conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); + conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName); + conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes); + conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes)); + conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval)); + conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors); + conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, config.partitionPathField); + conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass); + conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum); + + return conf; + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java index 9e77e73..eaca4c9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java @@ -23,11 +23,11 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperatorFactory; import org.apache.hudi.sink.bootstrap.BootstrapFunction; +import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; -import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.compact.CompactionPlanEvent; -import org.apache.hudi.sink.compact.CompactFunction; +import org.apache.hudi.sink.compact.CompactionPlanOperator; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.util.AvroSchemaConverter; @@ -81,7 +81,7 @@ public class HoodieFlinkStreamer { RowType rowType = (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg)) .getLogicalType(); - Configuration conf = FlinkOptions.fromStreamerConfig(cfg); + Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg); int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS); StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java new file mode 100644 index 0000000..5ffd58d --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.configuration.FlinkOptions; + +import org.apache.avro.Schema; +import org.apache.flink.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Utilities for flink hudi compaction. + */ +public class CompactionUtil { + + private static final Logger LOG = LoggerFactory.getLogger(CompactionUtil.class); + + /** + * Creates the metaClient. + * */ + public static HoodieTableMetaClient createMetaClient(Configuration conf) { + return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build(); + } + + /** + * Gets compaction Instant time. + * */ + public static String getCompactionInstantTime(HoodieTableMetaClient metaClient) { + Option<HoodieInstant> hoodieInstantOption = metaClient.getCommitsTimeline().filterPendingExcludingCompaction().firstInstant(); + if (hoodieInstantOption.isPresent()) { + HoodieInstant firstInstant = hoodieInstantOption.get(); + String newCommitTime = StreamerUtil.instantTimeSubtract(firstInstant.getTimestamp(), 1); + // Committed and pending compaction instants should have strictly lower timestamps + List<HoodieInstant> conflictingInstants = metaClient.getActiveTimeline() + .getWriteTimeline().filterCompletedAndCompactionInstants().getInstants() + .filter(instant -> HoodieTimeline.compareTimestamps( + instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, newCommitTime)) + .collect(Collectors.toList()); + ValidationUtils.checkArgument(conflictingInstants.isEmpty(), + "Following instants have timestamps >= compactionInstant (" + newCommitTime + ") Instants :" + + conflictingInstants); + return newCommitTime; + } else { + return HoodieActiveTimeline.createNewInstantTime(); + } + } + + /** + * Sets up the avro schema string into the give configuration {@code conf} + * through reading from the hoodie table metadata. + * + * @param conf The configuration + */ + public static void setAvroSchema(Configuration conf, HoodieTableMetaClient metaClient) throws Exception { + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); + Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false); + conf.setString(FlinkOptions.READ_AVRO_SCHEMA, tableAvroSchema.toString()); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index b9b6ce9..caecba4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -90,7 +90,7 @@ public class StreamerUtil { } public static Schema getSourceSchema(FlinkStreamerConfig cfg) { - return new FilebasedSchemaProvider(FlinkOptions.fromStreamerConfig(cfg)).getSourceSchema(); + return new FilebasedSchemaProvider(FlinkStreamerConfig.toFlinkConfig(cfg)).getSourceSchema(); } public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) { @@ -150,7 +150,7 @@ public class StreamerUtil { } public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig conf) { - return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf)); + return getHoodieClientConfig(FlinkStreamerConfig.toFlinkConfig(conf)); } public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { @@ -302,4 +302,12 @@ public class StreamerUtil { long oldTime = Long.parseLong(oldInstant); return String.valueOf(oldTime + milliseconds); } + + /** + * Subtract the old instant time with given milliseconds and returns. + * */ + public static String instantTimeSubtract(String oldInstant, long milliseconds) { + long oldTime = Long.parseLong(oldInstant); + return String.valueOf(oldTime - milliseconds); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java index ad9c9dc..b001c54 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java @@ -18,8 +18,15 @@ package org.apache.hudi.sink; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CompactionUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.bootstrap.BootstrapFunction; import org.apache.hudi.sink.compact.CompactFunction; @@ -27,10 +34,15 @@ import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.compact.CompactionPlanEvent; import org.apache.hudi.sink.compact.CompactionPlanOperator; +import org.apache.hudi.sink.compact.CompactionPlanSourceFunction; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; +import org.apache.hudi.sink.compact.NonKeyedCompactFunction; import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.transform.RowDataToHoodieFunction; +import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -50,13 +62,20 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; -import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.TestLogger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.nio.charset.StandardCharsets; @@ -72,6 +91,8 @@ import java.util.concurrent.TimeUnit; */ public class StreamWriteITCase extends TestLogger { + protected static final Logger LOG = LoggerFactory.getLogger(StreamWriteITCase.class); + private static final Map<String, List<String>> EXPECTED = new HashMap<>(); static { @@ -148,6 +169,83 @@ public class StreamWriteITCase extends TestLogger { } @Test + public void testHoodieFlinkCompactor() throws Exception { + // Create hoodie table and insert into data. + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); + tableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + Map<String, String> options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + String insertInto = "insert into t1 values\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n" + + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n" + + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n" + + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n" + + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n" + + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n" + + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n" + + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')"; + TableResult tableResult = tableEnv.executeSql(insertInto); + TimeUnit.SECONDS.sleep(5); + tableResult.await(); + + // Make configuration and setAvroSchema. + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkCompactionConfig cfg = new FlinkCompactionConfig(); + cfg.path = tempFile.getAbsolutePath(); + Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); + conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key(), "partition"); + + // create metaClient + HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf); + + // set the table name + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + + // set table schema + CompactionUtil.setAvroSchema(conf, metaClient); + + // judge whether have operation + // To compute the compaction instant time and do compaction. + String instantTime = CompactionUtil.getCompactionInstantTime(metaClient); + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); + writeClient.scheduleCompactionAtInstant(instantTime, Option.empty()); + + HoodieFlinkTable<?> table = writeClient.getHoodieTable(); + // the last instant takes the highest priority. + Option<HoodieInstant> compactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant(); + String compactionInstantTime = compactionInstant.get().getTimestamp(); + + // generate compaction plan + // should support configurable commit metadata + HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( + table.getMetaClient(), compactionInstantTime); + + HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + + env.addSource(new CompactionPlanSourceFunction(table, instant, compactionPlan, compactionInstantTime)) + .name("compaction_source") + .uid("uid_compaction_source") + .rebalance() + .transform("compact_task", + TypeInformation.of(CompactionCommitEvent.class), + new ProcessOperator<>(new NonKeyedCompactFunction(conf))) + .setParallelism(compactionPlan.getOperations().size()) + .addSink(new CompactionCommitSink(conf)) + .name("clean_commits") + .uid("uid_clean_commits") + .setParallelism(1); + + env.execute("flink_hudi_compaction"); + TestData.checkWrittenFullData(tempFile, EXPECTED); + } + + @Test public void testMergeOnReadWriteWithCompaction() throws Exception { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);