yuzhaojing commented on code in PR #5890:
URL: https://github.com/apache/hudi/pull/5890#discussion_r902557363
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java:
##########
@@ -45,148 +49,290 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
/**
* Flink hudi clustering program that can be executed manually.
*/
public class HoodieFlinkClusteringJob {
protected static final Logger LOG =
LoggerFactory.getLogger(HoodieFlinkClusteringJob.class);
+ /**
+ * Flink Execution Environment.
+ */
+ private final AsyncClusteringService clusteringScheduleService;
+
+ public HoodieFlinkClusteringJob(AsyncClusteringService service) {
+ this.clusteringScheduleService = service;
+ }
+
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ FlinkClusteringConfig cfg = getFlinkClusteringConfig(args);
+ Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+ AsyncClusteringService service = new AsyncClusteringService(cfg, conf,
env);
+
+ new HoodieFlinkClusteringJob(service).start(cfg.serviceMode);
+ }
+
+ /**
+ * Main method to start clustering service.
+ */
+ public void start(boolean serviceMode) throws Exception {
+ if (serviceMode) {
+ clusteringScheduleService.start(null);
+ try {
+ clusteringScheduleService.waitForShutdown();
+ } catch (Exception e) {
+ throw new HoodieException(e.getMessage(), e);
+ } finally {
+ LOG.info("Shut down hoodie flink clustering");
+ }
+ } else {
+ LOG.info("Hoodie Flink Clustering running only single round");
+ try {
+ clusteringScheduleService.cluster();
+ } catch (Exception e) {
+ LOG.error("Got error running delta sync once. Shutting down", e);
+ throw e;
+ } finally {
+ LOG.info("Shut down hoodie flink clustering");
+ }
+ }
+ }
+
+ public static FlinkClusteringConfig getFlinkClusteringConfig(String[] args) {
FlinkClusteringConfig cfg = new FlinkClusteringConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
+ return cfg;
+ }
- Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
-
- // create metaClient
- HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
-
- // set table name
- conf.setString(FlinkOptions.TABLE_NAME,
metaClient.getTableConfig().getTableName());
-
- // set table type
- conf.setString(FlinkOptions.TABLE_TYPE,
metaClient.getTableConfig().getTableType().name());
-
- // set record key field
- conf.setString(FlinkOptions.RECORD_KEY_FIELD,
metaClient.getTableConfig().getRecordKeyFieldProp());
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+
+ /**
+ * Schedules clustering in service.
+ */
+ public static class AsyncClusteringService extends HoodieAsyncTableService {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Flink Clustering Config.
+ */
+ private final FlinkClusteringConfig cfg;
+
+ /**
+ * Flink Config.
+ */
+ private final Configuration conf;
+
+ /**
+ * Meta Client.
+ */
+ private final HoodieTableMetaClient metaClient;
+
+ /**
+ * Write Client.
+ */
+ private final HoodieFlinkWriteClient<?> writeClient;
+
+ /**
+ * The hoodie table.
+ */
+ private final HoodieFlinkTable<?> table;
+
+ /**
+ * Flink Execution Environment.
+ */
+ private final StreamExecutionEnvironment env;
+
+ /**
+ * Executor Service.
+ */
+ private final ExecutorService executor;
+
+ public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration
conf, StreamExecutionEnvironment env) throws Exception {
+ this.cfg = cfg;
+ this.conf = conf;
+ this.env = env;
+ this.executor = Executors.newFixedThreadPool(1);
+
+ // create metaClient
+ this.metaClient = StreamerUtil.createMetaClient(conf);
+
+ // set table name
+ conf.setString(FlinkOptions.TABLE_NAME,
metaClient.getTableConfig().getTableName());
+
+ // set table type
+ conf.setString(FlinkOptions.TABLE_TYPE,
metaClient.getTableConfig().getTableType().name());
+
+ // set record key field
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD,
metaClient.getTableConfig().getRecordKeyFieldProp());
+
+ // set partition field
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD,
metaClient.getTableConfig().getPartitionFieldProp());
+
+ // set table schema
+ CompactionUtil.setAvroSchema(conf, metaClient);
+
+ this.writeClient = StreamerUtil.createWriteClient(conf);
+ this.writeConfig = writeClient.getConfig();
+ this.table = writeClient.getHoodieTable();
+ }
- // set partition field
- conf.setString(FlinkOptions.PARTITION_PATH_FIELD,
metaClient.getTableConfig().getPartitionFieldProp());
+ @Override
+ protected Pair<CompletableFuture, ExecutorService> startService() {
+ return Pair.of(CompletableFuture.supplyAsync(() -> {
+ boolean error = false;
+
+ try {
+ while (!isShutdownRequested()) {
+ try {
+ cluster();
+ Thread.sleep(cfg.minClusteringIntervalSeconds * 1000);
+ } catch (Exception e) {
+ LOG.error("Shutting down clustering service due to exception",
e);
+ error = true;
+ throw new HoodieException(e.getMessage(), e);
+ }
+ }
+ } finally {
+ shutdownAsyncService(error);
+ }
+ return true;
+ }, executor), executor);
+ }
- // set table schema
- CompactionUtil.setAvroSchema(conf, metaClient);
+ private void cluster() throws Exception {
+ table.getMetaClient().reloadActiveTimeline();
- HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);
- HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+ // judges whether there are operations
+ // to compute the clustering instant time and exec clustering.
+ if (cfg.schedule) {
+ ClusteringUtil.validateClusteringScheduling(conf);
+ String clusteringInstantTime =
HoodieActiveTimeline.createNewInstantTime();
+ boolean scheduled =
writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+ if (!scheduled) {
+ // do nothing.
+ LOG.info("No clustering plan for this job ");
+ return;
+ }
+ table.getMetaClient().reloadActiveTimeline();
+ }
- // judge whether have operation
- // to compute the clustering instant time and do cluster.
- if (cfg.schedule) {
- String clusteringInstantTime =
HoodieActiveTimeline.createNewInstantTime();
- boolean scheduled =
writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
- if (!scheduled) {
+ // fetch the instant based on the configured execution sequence
+ HoodieTimeline timeline =
table.getActiveTimeline().filterPendingReplaceTimeline()
+ .filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED);
+ Option<HoodieInstant> requested =
CompactionUtil.isLIFO(cfg.clusteringSeq) ? timeline.lastInstant() :
timeline.firstInstant();
+ if (!requested.isPresent()) {
// do nothing.
- LOG.info("No clustering plan for this job ");
+ LOG.info("No clustering plan scheduled, turns on the clustering plan
schedule with --schedule option");
return;
}
- }
- table.getMetaClient().reloadActiveTimeline();
+ HoodieInstant clusteringInstant = requested.get();
- // fetch the instant based on the configured execution sequence
- HoodieTimeline timeline =
table.getActiveTimeline().filterPendingReplaceTimeline()
- .filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED);
- Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.clusteringSeq)
? timeline.lastInstant() : timeline.firstInstant();
- if (!requested.isPresent()) {
- // do nothing.
- LOG.info("No clustering plan scheduled, turns on the clustering plan
schedule with --schedule option");
- return;
- }
+ HoodieInstant inflightInstant =
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
+ if (timeline.containsInstant(inflightInstant)) {
+ LOG.info("Rollback inflight clustering instant: [" + clusteringInstant
+ "]");
+ table.rollbackInflightClustering(inflightInstant,
+ commitToRollback ->
writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback,
false));
+ table.getMetaClient().reloadActiveTimeline();
+ }
- HoodieInstant clusteringInstant = requested.get();
+ // generate clustering plan
+ // should support configurable commit metadata
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption =
ClusteringUtils.getClusteringPlan(
+ table.getMetaClient(), clusteringInstant);
- HoodieInstant inflightInstant =
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
- if (timeline.containsInstant(inflightInstant)) {
- LOG.info("Rollback inflight clustering instant: [" + clusteringInstant +
"]");
- table.rollbackInflightClustering(inflightInstant,
- commitToRollback ->
writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback,
false));
- table.getMetaClient().reloadActiveTimeline();
- }
+ if (!clusteringPlanOption.isPresent()) {
+ // do nothing.
+ LOG.info("No clustering plan scheduled, turns on the clustering plan
schedule with --schedule option");
+ return;
+ }
- // generate clustering plan
- // should support configurable commit metadata
- Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption =
ClusteringUtils.getClusteringPlan(
- table.getMetaClient(), clusteringInstant);
+ HoodieClusteringPlan clusteringPlan =
clusteringPlanOption.get().getRight();
- if (!clusteringPlanOption.isPresent()) {
- // do nothing.
- LOG.info("No clustering plan scheduled, turns on the clustering plan
schedule with --schedule option");
- return;
- }
+ if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)
+ || (clusteringPlan.getInputGroups().isEmpty())) {
+ // No clustering plan, do nothing and return.
+ LOG.info("No clustering plan for instant " +
clusteringInstant.getTimestamp());
+ return;
+ }
- HoodieClusteringPlan clusteringPlan =
clusteringPlanOption.get().getRight();
+ HoodieInstant instant =
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp());
+ HoodieTimeline pendingClusteringTimeline =
table.getActiveTimeline().filterPendingReplaceTimeline();
+ if (!pendingClusteringTimeline.containsInstant(instant)) {
+ // this means that the clustering plan was written to auxiliary
path(.tmp)
+ // but not the meta path(.hoodie), this usually happens when the job
crush
+ // exceptionally.
- if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)
- || (clusteringPlan.getInputGroups().isEmpty())) {
- // No clustering plan, do nothing and return.
- LOG.info("No clustering plan for instant " +
clusteringInstant.getTimestamp());
- return;
- }
+ // clean the clustering plan in auxiliary path and cancels the
clustering.
- HoodieInstant instant =
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp());
- HoodieTimeline pendingClusteringTimeline =
table.getActiveTimeline().filterPendingReplaceTimeline();
- if (!pendingClusteringTimeline.containsInstant(instant)) {
- // this means that the clustering plan was written to auxiliary
path(.tmp)
- // but not the meta path(.hoodie), this usually happens when the job
crush
- // exceptionally.
+ LOG.warn("The clustering plan was fetched through the auxiliary
path(.tmp) but not the meta path(.hoodie).\n"
+ + "Clean the clustering plan in auxiliary path and cancels the
clustering");
+ CompactionUtil.cleanInstant(table.getMetaClient(), instant);
+ return;
+ }
- // clean the clustering plan in auxiliary path and cancels the
clustering.
+ // get clusteringParallelism.
+ int clusteringParallelism =
conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1
+ ? clusteringPlan.getInputGroups().size() :
conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
- LOG.warn("The clustering plan was fetched through the auxiliary
path(.tmp) but not the meta path(.hoodie).\n"
- + "Clean the clustering plan in auxiliary path and cancels the
clustering");
- CompactionUtil.cleanInstant(table.getMetaClient(), instant);
- return;
- }
+ // Mark instant as clustering inflight
+ table.getActiveTimeline().transitionReplaceRequestedToInflight(instant,
Option.empty());
- // get clusteringParallelism.
- int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS)
== -1
- ? clusteringPlan.getInputGroups().size() :
conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
+ final Schema tableAvroSchema =
StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
+ final DataType rowDataType =
AvroSchemaConverter.convertToDataType(tableAvroSchema);
+ final RowType rowType = (RowType) rowDataType.getLogicalType();
- // Mark instant as clustering inflight
- table.getActiveTimeline().transitionReplaceRequestedToInflight(instant,
Option.empty());
+ // setup configuration
+ long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
+ conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
- final Schema tableAvroSchema =
StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
- final DataType rowDataType =
AvroSchemaConverter.convertToDataType(tableAvroSchema);
- final RowType rowType = (RowType) rowDataType.getLogicalType();
+ DataStream<ClusteringCommitEvent> dataStream = env.addSource(new
ClusteringPlanSourceFunction(timeline.lastInstant().get(), clusteringPlan))
+ .name("clustering_source")
+ .uid("uid_clustering_source")
Review Comment:
Use clustering instant instand it.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.common.model.ClusteringGroupInfo;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.ClusteringUtil;
+import org.apache.hudi.util.FlinkTables;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Operator that generates the clustering plan with pluggable strategies on
finished checkpoints.
+ *
+ * <p>It should be singleton to avoid conflicts.
+ */
+public class ClusteringPlanOperator extends
AbstractStreamOperator<ClusteringPlanEvent>
+ implements OneInputStreamOperator<Object, ClusteringPlanEvent> {
+
+ /**
+ * Config options.
+ */
+ private final Configuration conf;
+
+ /**
+ * Meta Client.
+ */
+ @SuppressWarnings("rawtypes")
+ private transient HoodieFlinkTable table;
+
+ public ClusteringPlanOperator(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.table = FlinkTables.createTable(conf, getRuntimeContext());
+ // when starting up, rolls back all the inflight clustering instants if
there exists,
+ // these instants are in priority for scheduling task because the
clustering instants are
+ // scheduled from earliest(FIFO sequence).
+ ClusteringUtil.rollbackClustering(table);
+ }
+
+ @Override
+ public void processElement(StreamRecord<Object> streamRecord) {
+ // no operation
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ try {
+ table.getMetaClient().reloadActiveTimeline();
+ scheduleClustering(table, checkpointId);
+ } catch (Throwable throwable) {
+ // make it fail-safe
+ LOG.error("Error while scheduling clustering plan for checkpoint: " +
checkpointId, throwable);
+ }
+ }
+
+ private void scheduleClustering(HoodieFlinkTable<?> table, long
checkpointId) {
+ // the first instant takes the highest priority.
+ Option<HoodieInstant> firstRequested =
table.getActiveTimeline().filterPendingReplaceTimeline()
+ .filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED).firstInstant();
+ if (!firstRequested.isPresent()) {
Review Comment:
fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]