danny0405 commented on code in PR #5890: URL: https://github.com/apache/hudi/pull/5890#discussion_r902392796
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java: ########## @@ -100,15 +105,25 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter; private transient HoodieFlinkWriteClient writeClient; private transient BulkInsertWriterHelper writerHelper; - private transient String instantTime; private transient BinaryExternalSorter sorter; private transient StreamRecordCollector<ClusteringCommitEvent> collector; private transient BinaryRowDataSerializer binarySerializer; + /** + * Whether to execute compaction asynchronously. + */ Review Comment: compaction -> clustering ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.clustering; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.common.model.ClusteringGroupInfo; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.util.ClusteringUtil; +import org.apache.hudi.util.FlinkTables; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * Operator that generates the clustering plan with pluggable strategies on finished checkpoints. + * + * <p>It should be singleton to avoid conflicts. + */ +public class ClusteringPlanOperator extends AbstractStreamOperator<ClusteringPlanEvent> + implements OneInputStreamOperator<Object, ClusteringPlanEvent> { + + /** + * Config options. + */ + private final Configuration conf; + + /** + * Meta Client. + */ + @SuppressWarnings("rawtypes") + private transient HoodieFlinkTable table; + + public ClusteringPlanOperator(Configuration conf) { + this.conf = conf; + } + + @Override + public void open() throws Exception { + super.open(); + this.table = FlinkTables.createTable(conf, getRuntimeContext()); + // when starting up, rolls back all the inflight clustering instants if there exists, + // these instants are in priority for scheduling task because the clustering instants are + // scheduled from earliest(FIFO sequence). + ClusteringUtil.rollbackClustering(table); + } + + @Override + public void processElement(StreamRecord<Object> streamRecord) { + // no operation + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + try { + table.getMetaClient().reloadActiveTimeline(); + scheduleClustering(table, checkpointId); + } catch (Throwable throwable) { + // make it fail-safe + LOG.error("Error while scheduling clustering plan for checkpoint: " + checkpointId, throwable); + } + } + + private void scheduleClustering(HoodieFlinkTable<?> table, long checkpointId) { + // the first instant takes the highest priority. + Option<HoodieInstant> firstRequested = table.getActiveTimeline().filterPendingReplaceTimeline() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant(); + if (!firstRequested.isPresent()) { Review Comment: We should use `ClusteringUtils.getPendingClusteringInstantTimes` to fetch the clustering instants, the clustering instant uses the `REPLACE_COMMIT_ACTION` as action which is same with `INSERT OVERWRITE`, we should distinguish here to avoid ambiguity ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java: ########## @@ -45,148 +49,290 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + /** * Flink hudi clustering program that can be executed manually. */ public class HoodieFlinkClusteringJob { protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkClusteringJob.class); + /** + * Flink Execution Environment. + */ + private final AsyncClusteringService clusteringScheduleService; + + public HoodieFlinkClusteringJob(AsyncClusteringService service) { + this.clusteringScheduleService = service; + } + public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkClusteringConfig cfg = getFlinkClusteringConfig(args); + Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); + + AsyncClusteringService service = new AsyncClusteringService(cfg, conf, env); + + new HoodieFlinkClusteringJob(service).start(cfg.serviceMode); + } + + /** + * Main method to start clustering service. + */ + public void start(boolean serviceMode) throws Exception { + if (serviceMode) { + clusteringScheduleService.start(null); + try { + clusteringScheduleService.waitForShutdown(); + } catch (Exception e) { + throw new HoodieException(e.getMessage(), e); + } finally { + LOG.info("Shut down hoodie flink clustering"); + } + } else { + LOG.info("Hoodie Flink Clustering running only single round"); + try { + clusteringScheduleService.cluster(); + } catch (Exception e) { + LOG.error("Got error running delta sync once. Shutting down", e); + throw e; + } finally { + LOG.info("Shut down hoodie flink clustering"); + } + } + } + + public static FlinkClusteringConfig getFlinkClusteringConfig(String[] args) { FlinkClusteringConfig cfg = new FlinkClusteringConfig(); JCommander cmd = new JCommander(cfg, null, args); if (cfg.help || args.length == 0) { cmd.usage(); System.exit(1); } + return cfg; + } - Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); - - // create metaClient - HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); - - // set table name - conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); - - // set table type - conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name()); - - // set record key field - conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** + * Schedules clustering in service. + */ + public static class AsyncClusteringService extends HoodieAsyncTableService { + private static final long serialVersionUID = 1L; + + /** + * Flink Clustering Config. + */ + private final FlinkClusteringConfig cfg; + + /** + * Flink Config. + */ + private final Configuration conf; + + /** + * Meta Client. + */ + private final HoodieTableMetaClient metaClient; + + /** + * Write Client. + */ + private final HoodieFlinkWriteClient<?> writeClient; + + /** + * The hoodie table. + */ + private final HoodieFlinkTable<?> table; + + /** + * Flink Execution Environment. + */ + private final StreamExecutionEnvironment env; + + /** + * Executor Service. + */ + private final ExecutorService executor; + + public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception { + this.cfg = cfg; + this.conf = conf; + this.env = env; + this.executor = Executors.newFixedThreadPool(1); + + // create metaClient + this.metaClient = StreamerUtil.createMetaClient(conf); + + // set table name + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + + // set table type + conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name()); + + // set record key field + conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); + + // set partition field + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp()); + + // set table schema + CompactionUtil.setAvroSchema(conf, metaClient); + + this.writeClient = StreamerUtil.createWriteClient(conf); + this.writeConfig = writeClient.getConfig(); + this.table = writeClient.getHoodieTable(); + } - // set partition field - conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp()); + @Override + protected Pair<CompletableFuture, ExecutorService> startService() { + return Pair.of(CompletableFuture.supplyAsync(() -> { + boolean error = false; + + try { + while (!isShutdownRequested()) { + try { + cluster(); + Thread.sleep(cfg.minClusteringIntervalSeconds * 1000); + } catch (Exception e) { + LOG.error("Shutting down clustering service due to exception", e); + error = true; + throw new HoodieException(e.getMessage(), e); + } + } + } finally { + shutdownAsyncService(error); + } + return true; + }, executor), executor); + } - // set table schema - CompactionUtil.setAvroSchema(conf, metaClient); + private void cluster() throws Exception { + table.getMetaClient().reloadActiveTimeline(); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); - HoodieFlinkTable<?> table = writeClient.getHoodieTable(); + // judges whether there are operations + // to compute the clustering instant time and exec clustering. + if (cfg.schedule) { + ClusteringUtil.validateClusteringScheduling(conf); + String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); + boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); + if (!scheduled) { + // do nothing. + LOG.info("No clustering plan for this job "); + return; + } + table.getMetaClient().reloadActiveTimeline(); + } - // judge whether have operation - // to compute the clustering instant time and do cluster. - if (cfg.schedule) { - String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); - boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); - if (!scheduled) { + // fetch the instant based on the configured execution sequence + HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED); + Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.clusteringSeq) ? timeline.lastInstant() : timeline.firstInstant(); + if (!requested.isPresent()) { // do nothing. - LOG.info("No clustering plan for this job "); + LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); return; } - } - table.getMetaClient().reloadActiveTimeline(); + HoodieInstant clusteringInstant = requested.get(); - // fetch the instant based on the configured execution sequence - HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline() - .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED); - Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.clusteringSeq) ? timeline.lastInstant() : timeline.firstInstant(); - if (!requested.isPresent()) { - // do nothing. - LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); - return; - } + HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp()); + if (timeline.containsInstant(inflightInstant)) { + LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]"); + table.rollbackInflightClustering(inflightInstant, + commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); + table.getMetaClient().reloadActiveTimeline(); + } - HoodieInstant clusteringInstant = requested.get(); + // generate clustering plan + // should support configurable commit metadata + Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan( + table.getMetaClient(), clusteringInstant); - HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp()); - if (timeline.containsInstant(inflightInstant)) { - LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + "]"); - table.rollbackInflightClustering(inflightInstant, - commitToRollback -> writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); - table.getMetaClient().reloadActiveTimeline(); - } + if (!clusteringPlanOption.isPresent()) { + // do nothing. + LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); + return; + } - // generate clustering plan - // should support configurable commit metadata - Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan( - table.getMetaClient(), clusteringInstant); + HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight(); - if (!clusteringPlanOption.isPresent()) { - // do nothing. - LOG.info("No clustering plan scheduled, turns on the clustering plan schedule with --schedule option"); - return; - } + if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null) + || (clusteringPlan.getInputGroups().isEmpty())) { + // No clustering plan, do nothing and return. + LOG.info("No clustering plan for instant " + clusteringInstant.getTimestamp()); + return; + } - HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight(); + HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp()); + HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); + if (!pendingClusteringTimeline.containsInstant(instant)) { + // this means that the clustering plan was written to auxiliary path(.tmp) + // but not the meta path(.hoodie), this usually happens when the job crush + // exceptionally. - if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null) - || (clusteringPlan.getInputGroups().isEmpty())) { - // No clustering plan, do nothing and return. - LOG.info("No clustering plan for instant " + clusteringInstant.getTimestamp()); - return; - } + // clean the clustering plan in auxiliary path and cancels the clustering. - HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp()); - HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); - if (!pendingClusteringTimeline.containsInstant(instant)) { - // this means that the clustering plan was written to auxiliary path(.tmp) - // but not the meta path(.hoodie), this usually happens when the job crush - // exceptionally. + LOG.warn("The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" + + "Clean the clustering plan in auxiliary path and cancels the clustering"); + CompactionUtil.cleanInstant(table.getMetaClient(), instant); + return; + } - // clean the clustering plan in auxiliary path and cancels the clustering. + // get clusteringParallelism. + int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1 + ? clusteringPlan.getInputGroups().size() : conf.getInteger(FlinkOptions.CLUSTERING_TASKS); - LOG.warn("The clustering plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" - + "Clean the clustering plan in auxiliary path and cancels the clustering"); - CompactionUtil.cleanInstant(table.getMetaClient(), instant); - return; - } + // Mark instant as clustering inflight + table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); - // get clusteringParallelism. - int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1 - ? clusteringPlan.getInputGroups().size() : conf.getInteger(FlinkOptions.CLUSTERING_TASKS); + final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false); + final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); + final RowType rowType = (RowType) rowDataType.getLogicalType(); - // Mark instant as clustering inflight - table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); + // setup configuration + long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout(); + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); - final Schema tableAvroSchema = StreamerUtil.getTableAvroSchema(table.getMetaClient(), false); - final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema); - final RowType rowType = (RowType) rowDataType.getLogicalType(); + DataStream<ClusteringCommitEvent> dataStream = env.addSource(new ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan)) + .name("clustering_source") + .uid("uid_clustering_source") Review Comment: Is `timeline.lastInstant().get()` the timestamp we want to cluster ? I don't think so. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.table.HoodieFlinkTable; + +import org.apache.flink.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utilities for flink hudi clustering. + */ +public class ClusteringUtil { + + private static final Logger LOG = LoggerFactory.getLogger(ClusteringUtil.class); + + public static void validateClusteringScheduling(Configuration conf) { + if (OptionsResolver.isBucketIndexType(conf)) { + throw new UnsupportedOperationException("Clustering is not supported for bucket index."); + } + } + + /** + * Schedules clustering plan by condition. + * + * @param conf The configuration + * @param writeClient The write client + * @param committed Whether the instant was committed + */ + public static void scheduleClustering(Configuration conf, HoodieFlinkWriteClient writeClient, boolean committed) { + validateClusteringScheduling(conf); + if (committed) { + writeClient.scheduleClustering(Option.empty()); + } + } + + /** + * Force rolls back all the inflight clustering instants, especially for job failover restart. + * + * @param table The hoodie table + */ + public static void rollbackClustering(HoodieFlinkTable<?> table) { + HoodieTimeline inflightClusteringTimeline = table.getActiveTimeline() + .filterPendingCompactionTimeline() Review Comment: The timeline is wrong, you may need to fetch the clustering plan with REQUESTED instant to decide this is a clustering instant. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java: ########## @@ -45,148 +49,290 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + /** * Flink hudi clustering program that can be executed manually. */ public class HoodieFlinkClusteringJob { protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkClusteringJob.class); + /** + * Flink Execution Environment. + */ + private final AsyncClusteringService clusteringScheduleService; + + public HoodieFlinkClusteringJob(AsyncClusteringService service) { + this.clusteringScheduleService = service; + } + public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkClusteringConfig cfg = getFlinkClusteringConfig(args); + Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); + + AsyncClusteringService service = new AsyncClusteringService(cfg, conf, env); + + new HoodieFlinkClusteringJob(service).start(cfg.serviceMode); + } + + /** + * Main method to start clustering service. + */ + public void start(boolean serviceMode) throws Exception { + if (serviceMode) { + clusteringScheduleService.start(null); + try { + clusteringScheduleService.waitForShutdown(); + } catch (Exception e) { + throw new HoodieException(e.getMessage(), e); + } finally { + LOG.info("Shut down hoodie flink clustering"); + } + } else { + LOG.info("Hoodie Flink Clustering running only single round"); + try { + clusteringScheduleService.cluster(); + } catch (Exception e) { + LOG.error("Got error running delta sync once. Shutting down", e); + throw e; + } finally { + LOG.info("Shut down hoodie flink clustering"); + } + } + } + + public static FlinkClusteringConfig getFlinkClusteringConfig(String[] args) { FlinkClusteringConfig cfg = new FlinkClusteringConfig(); JCommander cmd = new JCommander(cfg, null, args); if (cfg.help || args.length == 0) { cmd.usage(); System.exit(1); } + return cfg; + } - Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); - - // create metaClient - HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); - - // set table name - conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); - - // set table type - conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name()); - - // set record key field - conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + /** + * Schedules clustering in service. + */ + public static class AsyncClusteringService extends HoodieAsyncTableService { + private static final long serialVersionUID = 1L; + + /** + * Flink Clustering Config. + */ + private final FlinkClusteringConfig cfg; + + /** + * Flink Config. + */ + private final Configuration conf; + + /** + * Meta Client. + */ + private final HoodieTableMetaClient metaClient; + + /** + * Write Client. + */ + private final HoodieFlinkWriteClient<?> writeClient; + + /** + * The hoodie table. + */ + private final HoodieFlinkTable<?> table; + + /** + * Flink Execution Environment. + */ + private final StreamExecutionEnvironment env; + + /** + * Executor Service. + */ + private final ExecutorService executor; + + public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration conf, StreamExecutionEnvironment env) throws Exception { + this.cfg = cfg; + this.conf = conf; + this.env = env; + this.executor = Executors.newFixedThreadPool(1); + + // create metaClient + this.metaClient = StreamerUtil.createMetaClient(conf); + + // set table name + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + + // set table type + conf.setString(FlinkOptions.TABLE_TYPE, metaClient.getTableConfig().getTableType().name()); + + // set record key field + conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); + + // set partition field + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp()); + + // set table schema + CompactionUtil.setAvroSchema(conf, metaClient); + + this.writeClient = StreamerUtil.createWriteClient(conf); + this.writeConfig = writeClient.getConfig(); + this.table = writeClient.getHoodieTable(); + } - // set partition field - conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp()); + @Override + protected Pair<CompletableFuture, ExecutorService> startService() { + return Pair.of(CompletableFuture.supplyAsync(() -> { + boolean error = false; + + try { + while (!isShutdownRequested()) { + try { + cluster(); + Thread.sleep(cfg.minClusteringIntervalSeconds * 1000); + } catch (Exception e) { + LOG.error("Shutting down clustering service due to exception", e); + error = true; + throw new HoodieException(e.getMessage(), e); + } + } + } finally { + shutdownAsyncService(error); + } + return true; + }, executor), executor); + } - // set table schema - CompactionUtil.setAvroSchema(conf, metaClient); + private void cluster() throws Exception { + table.getMetaClient().reloadActiveTimeline(); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); - HoodieFlinkTable<?> table = writeClient.getHoodieTable(); + // judges whether there are operations + // to compute the clustering instant time and exec clustering. + if (cfg.schedule) { + ClusteringUtil.validateClusteringScheduling(conf); + String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); + boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); + if (!scheduled) { + // do nothing. + LOG.info("No clustering plan for this job "); + return; + } + table.getMetaClient().reloadActiveTimeline(); + } - // judge whether have operation - // to compute the clustering instant time and do cluster. - if (cfg.schedule) { - String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); - boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); - if (!scheduled) { + // fetch the instant based on the configured execution sequence + HoodieTimeline timeline = table.getActiveTimeline().filterPendingReplaceTimeline() + .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED); Review Comment: We should use ClusteringUtils.getPendingClusteringInstantTimes to fetch the clustering instants, the clustering instant uses the REPLACE_COMMIT_ACTION as action which is same with INSERT OVERWRITE, we should distinguish here to avoid ambiguity -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
