danny0405 commented on a change in pull request #3046:
URL: https://github.com/apache/hudi/pull/3046#discussion_r647299632
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
##########
@@ -78,21 +89,30 @@ public void processElement(CompactionPlanEvent event,
Context context, Collector
final String instantTime = event.getCompactionInstantTime();
final CompactionOperation compactionOperation = event.getOperation();
// executes the compaction task asynchronously to not block the checkpoint
barrier propagate.
- executor.execute(
- () -> {
- HoodieFlinkMergeOnReadTableCompactor compactor = new
HoodieFlinkMergeOnReadTableCompactor();
- List<WriteStatus> writeStatuses = compactor.compact(
- new HoodieFlinkCopyOnWriteTable<>(
- this.writeClient.getConfig(),
- this.writeClient.getEngineContext(),
- this.writeClient.getHoodieTable().getMetaClient()),
- this.writeClient.getHoodieTable().getMetaClient(),
- this.writeClient.getConfig(),
- compactionOperation,
- instantTime);
- collector.collect(new CompactionCommitEvent(instantTime,
writeStatuses, taskID));
- }, "Execute compaction for instant %s from task %d", instantTime,
taskID
- );
+ if (isAsynchronous) {
+ executor.execute(
+ () -> {
+ doCompaction(instantTime, compactionOperation, collector);
+ }, "Execute compaction for instant %s from task %d", instantTime,
taskID
+ );
+ } else {
+ // non-asynchronous way
+ doCompaction(instantTime, compactionOperation, collector);
Review comment:
synchronous way
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
##########
@@ -62,10 +63,20 @@
*/
private transient NonThrownExecutor executor;
+ /**
+ * Whether execute compactionFunction by asynchronous.
+ * */
Review comment:
Whether to execute compaction asynchronously.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
##########
@@ -78,21 +89,30 @@ public void processElement(CompactionPlanEvent event,
Context context, Collector
final String instantTime = event.getCompactionInstantTime();
final CompactionOperation compactionOperation = event.getOperation();
// executes the compaction task asynchronously to not block the checkpoint
barrier propagate.
- executor.execute(
- () -> {
- HoodieFlinkMergeOnReadTableCompactor compactor = new
HoodieFlinkMergeOnReadTableCompactor();
- List<WriteStatus> writeStatuses = compactor.compact(
- new HoodieFlinkCopyOnWriteTable<>(
- this.writeClient.getConfig(),
- this.writeClient.getEngineContext(),
- this.writeClient.getHoodieTable().getMetaClient()),
- this.writeClient.getHoodieTable().getMetaClient(),
- this.writeClient.getConfig(),
- compactionOperation,
- instantTime);
- collector.collect(new CompactionCommitEvent(instantTime,
writeStatuses, taskID));
- }, "Execute compaction for instant %s from task %d", instantTime,
taskID
- );
+ if (isAsynchronous) {
+ executor.execute(
+ () -> {
+ doCompaction(instantTime, compactionOperation, collector);
+ }, "Execute compaction for instant %s from task %d", instantTime,
taskID
Review comment:
() -> doCompaction(instantTime, compactionOperation, collector)
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSource.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Flink hudi compaction source function, this class will be built as a flink
source operation to do flink hudi compaction.
+ * In this class, a reasonable compaction time will be calculated and the
compaction plan will be built and pass to downstream operator.
Review comment:
```java
/**
* Flink hudi compaction source function.
*
* <p>This function read the compaction plan as {@link CompactionOperation}s
then assign the compaction task
* event {@link CompactionPlanEvent} to downstream operators.
*
* <p>The compaction instant time is specified explicitly with strategies:
*
* <ul>
* <li>If the timeline has no inflight instants,
* use {@link HoodieActiveTimeline#createNewInstantTime()} as the instant
time;</li>
* <li>If the timeline has inflight instants,
* use the {earliest inflight instant time - 1ms} as the instant time.</li>
* </ul>
*/
```
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSource.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Flink hudi compaction source function, this class will be built as a flink
source operation to do flink hudi compaction.
+ * In this class, a reasonable compaction time will be calculated and the
compaction plan will be built and pass to downstream operator.
+ */
+public class CompactionCommitSource extends AbstractRichFunction implements
SourceFunction<CompactionPlanEvent> {
+
Review comment:
`CompactionCommitSource` => `CompactionPlanSourceFunction`
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.StreamerUtil;
+
+import com.beust.jcommander.JCommander;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flink hudi compaction. Independent flink hudi compaction can be done by
Specifing the hudi table path and table.
+ * */
+public class HoodieFlinkCompactor {
+
Review comment:
```java
/**
* Flink hudi compaction program that can be executed manually.
*/
```
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import com.beust.jcommander.Parameter;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Configurations for Hoodie Flink compaction.
+ */
+public class FlinkCompactionConfig extends Configuration {
+
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ // ------------------------------------------------------------------------
+ // Hudi Write Options
+ // ------------------------------------------------------------------------
+
+ @Parameter(names = {"--path"}, description = "Base path for the target
hoodie table.", required = true)
+ public String path;
+
+ @Parameter(names = {"--hoodie-table-name"}, description = "Table name to
register to Hive metastore", required = true)
+ public String hoodieTableName;
+
+ @Parameter(names = {"--write-payload-class"},
+ description = "Payload class used. Override this, if you like to roll
your own merge logic, when upserting/inserting.\n"
+ + "This will render any value set for the option in-effective",
+ required = false)
+ public String writePayloadClass =
OverwriteWithLatestAvroPayload.class.getName();
+
+ // ------------------------------------------------------------------------
+ // Compaction Options
+ // ------------------------------------------------------------------------
+
+ public static final String NUM_COMMITS = "num_commits";
+ public static final String TIME_ELAPSED = "time_elapsed";
+ public static final String NUM_AND_TIME = "num_and_time";
+ public static final String NUM_OR_TIME = "num_or_time";
+ @Parameter(names = {"--compaction-trigger-strategy"},
+ description = "Strategy to trigger compaction, options are
'num_commits': trigger compaction when reach N delta commits;\n"
+ + "'time_elapsed': trigger compaction when time elapsed > N seconds
since last compaction;\n"
+ + "'num_and_time': trigger compaction when both NUM_COMMITS and
TIME_ELAPSED are satisfied;\n"
+ + "'num_or_time': trigger compaction when NUM_COMMITS or
TIME_ELAPSED is satisfied.\n"
+ + "Default is 'num_commits'",
+ required = false)
+ public String compactionTriggerStrategy = NUM_COMMITS;
+
+ @Parameter(names = {"--compaction-delta-commits"}, description = "Max delta
commits needed to trigger compaction, default 5 commits", required = false)
+ public Integer compactionDeltaCommits = 1;
+
+ @Parameter(names = {"--compaction-delta-seconds"}, description = "Max delta
seconds time needed to trigger compaction, default 1 hour", required = false)
+ public Integer compactionDeltaSeconds = 3600;
+
+ @Parameter(names = {"--clean-async-enabled"}, description = "Whether to
cleanup the old commits immediately on new commits, enabled by default",
required = false)
+ public Boolean cleanAsyncEnable = true;
+
+ @Parameter(names = {"--clean-retain-commits"},
+ description = "Number of commits to retain. So data will be retained for
num_of_commits * time_between_commits (scheduled).\n"
+ + "This also directly translates into how much you can incrementally
pull on this table, default 10",
+ required = false)
+ public Integer cleanRetainCommits = 10;
+
+ @Parameter(names = {"--archive-min-commits"},
+ description = "Min number of commits to keep before archiving older
commits into a sequential log, default 20.",
+ required = false)
+ public Integer archiveMinCommits = 20;
+
+ @Parameter(names = {"--archive-max-commits"},
+ description = "Max number of commits to keep before archiving older
commits into a sequential log, default 30.",
+ required = false)
+ public Integer archiveMaxCommits = 30;
+
+ @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in
MB for compaction spillable map, default 100MB.", required = false)
+ public Integer compactionMaxMemory = 100;
+
+ /**
+ * Transforms a {@code HoodieFlinkCompaction.config} into {@code
Configuration}.
+ * The latter is more suitable for the table APIs. It reads all the
properties
+ * in the properties file (set by `--props` option) and cmd line options
+ * (set by `--hoodie-conf` option).
+ * */
+ public static org.apache.flink.configuration.Configuration
toCompactionConfig(FlinkCompactionConfig config) {
+ org.apache.flink.configuration.Configuration conf = new Configuration();
Review comment:
toFlinkConfig
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for Flink Hudi Compaction.
+ */
Review comment:
Compaction => compaction
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSource.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Flink hudi compaction source function, this class will be built as a flink
source operation to do flink hudi compaction.
+ * In this class, a reasonable compaction time will be calculated and the
compaction plan will be built and pass to downstream operator.
+ */
+public class CompactionCommitSource extends AbstractRichFunction implements
SourceFunction<CompactionPlanEvent> {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(CompactionCommitSource.class);
+
+ /**
+ * Config options.
+ */
+ private final Configuration conf;
+
+ /**
+ * Write client.
+ */
+ private transient HoodieFlinkWriteClient writeClient;
+
+ /**
+ * HoodieCompactionPlan.
+ * */
Review comment:
The compaction plan.
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for Flink Hudi Compaction.
+ */
+public class CompactionUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactionUtil.class);
+
+ /**
+ * Create the metaClient.
+ * */
Review comment:
Create => Creates.
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for Flink Hudi Compaction.
+ */
+public class CompactionUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactionUtil.class);
+
+ /**
+ * Create the metaClient.
+ * */
+ public static HoodieTableMetaClient createMetaClient(Configuration conf) {
+ return
HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(getHadoopConf()).build();
+ }
+
+ /**
+ * Get compaction Instant time.
+ * */
Review comment:
Get => Gets.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import com.beust.jcommander.Parameter;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Configurations for Hoodie Flink compaction.
+ */
+public class FlinkCompactionConfig extends Configuration {
+
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ // ------------------------------------------------------------------------
+ // Hudi Write Options
+ // ------------------------------------------------------------------------
+
+ @Parameter(names = {"--path"}, description = "Base path for the target
hoodie table.", required = true)
+ public String path;
+
+ @Parameter(names = {"--hoodie-table-name"}, description = "Table name to
register to Hive metastore", required = true)
+ public String hoodieTableName;
+
Review comment:
Read the table name from hoodie table meta file.
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -46,6 +46,7 @@
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
+import com.beust.jcommander.internal.Nullable;
import org.apache.avro.Schema;
Review comment:
invalid import.
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for Flink Hudi Compaction.
+ */
+public class CompactionUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactionUtil.class);
+
+ /**
+ * Create the metaClient.
+ * */
+ public static HoodieTableMetaClient createMetaClient(Configuration conf) {
+ return
HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(getHadoopConf()).build();
+ }
+
+ /**
+ * Get compaction Instant time.
+ * */
+ public static String getCompactionInstantTime(HoodieTableMetaClient
metaClient) {
+ Option<HoodieInstant> hoodieInstantOption =
metaClient.getCommitsTimeline().filterPendingExcludingCompaction().firstInstant();
+ if (hoodieInstantOption.isPresent()) {
+ HoodieInstant latestInstant = hoodieInstantOption.get();
+ String newCommitTime =
StreamerUtil.instantTimeSubtract(latestInstant.getTimestamp(), 1);
+ // Committed and pending compaction instants should have strictly lower
timestamps
+ List<HoodieInstant> conflictingInstants = metaClient.getActiveTimeline()
+
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
+ .filter(instant -> HoodieTimeline.compareTimestamps(
+ instant.getTimestamp(),
HoodieTimeline.GREATER_THAN_OR_EQUALS, newCommitTime))
+ .collect(Collectors.toList());
+ ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
+ "Following instants have timestamps >= compactionInstant (" +
newCommitTime + ") Instants :"
+ + conflictingInstants);
+ return newCommitTime;
+ } else {
+ return HoodieActiveTimeline.createNewInstantTime();
+ }
+ }
+
+ /**
+ * Inferences the deserialization Avro schema from the table schema (e.g.
the DDL)
+ * if both options {@link FlinkOptions#READ_AVRO_SCHEMA_PATH} and
+ * {@link FlinkOptions#READ_AVRO_SCHEMA} are not specified.
+ *
+ * @param conf The configuration
+ */
+ public static void setAvroSchema(Configuration conf) throws Exception {
+ org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConf();
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+ .setConf(hadoopConf)
+ .setBasePath(conf.getOptional(FlinkOptions.PATH).get()).build();
+ TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
+ Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
+ conf.setString(FlinkOptions.READ_AVRO_SCHEMA, tableAvroSchema.toString());
+ }
+
+ /**
+ * Keep to avoid to much modifications.
+ */
+ public static org.apache.hadoop.conf.Configuration getHadoopConf() {
+ return FlinkClientUtil.getHadoopConf();
Review comment:
Remove it and use `FlinkClientUtil.getHadoopConf()` directly.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSource.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.compact;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Flink hudi compaction source function, this class will be built as a flink
source operation to do flink hudi compaction.
+ * In this class, a reasonable compaction time will be calculated and the
compaction plan will be built and pass to downstream operator.
+ */
+public class CompactionCommitSource extends AbstractRichFunction implements
SourceFunction<CompactionPlanEvent> {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(CompactionCommitSource.class);
+
+ /**
+ * Config options.
+ */
+ private final Configuration conf;
+
+ /**
+ * Write client.
+ */
+ private transient HoodieFlinkWriteClient writeClient;
+
+ /**
+ * HoodieCompactionPlan.
+ * */
+ private HoodieCompactionPlan compactionPlan;
+
+ /**
+ * Compaction instant time.
+ */
+ private String compactionInstantTime;
+
+ public CompactionCommitSource(Configuration conf, String
compactionInstantTime, HoodieCompactionPlan compactionPlan) {
+ this.conf = conf;
+ this.compactionInstantTime = compactionInstantTime;
+ this.compactionPlan = compactionPlan;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.writeClient = StreamerUtil.createWriteClient(conf, null);
+ }
+
+ @Override
+ public void run(SourceContext sourceContext) throws Exception {
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+ HoodieInstant instant =
HoodieTimeline.getCompactionRequestedInstant(this.compactionInstantTime);
+ HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
+ if (!pendingCompactionTimeline.containsInstant(instant)) {
+ // this means that the compaction plan was written to auxiliary
path(.tmp)
+ // but not the meta path(.hoodie), this usually happens when the job
crush
+ // exceptionally.
+
+ // clean the compaction plan in auxiliary path and cancels the
compaction.
+
+ LOG.warn("The compaction plan was fetched through the auxiliary
path(.tmp) but not the meta path(.hoodie).\n"
+ + "Clean the compaction plan in auxiliary path and cancels the
compaction");
+ cleanInstant(table.getMetaClient(), instant);
+ return;
+ }
+
+ // Mark instant as compaction inflight
+ table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
+ table.getMetaClient().reloadActiveTimeline();
+
+ List<CompactionOperation> operations =
this.compactionPlan.getOperations().stream()
+
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
+ LOG.info("CompactionPlanFunction compacting " + operations + " files");
+ for (CompactionOperation operation : operations) {
+ sourceContext.collect(new CompactionPlanEvent(compactionInstantTime,
operation));
+ }
+ }
+
+ private void cleanInstant(HoodieTableMetaClient metaClient, HoodieInstant
instant) {
+ Path commitFilePath = new Path(metaClient.getMetaAuxiliaryPath(),
instant.getFileName());
+ try {
+ if (metaClient.getFs().exists(commitFilePath)) {
+ boolean deleted = metaClient.getFs().delete(commitFilePath, false);
+ if (deleted) {
+ LOG.info("Removed instant " + instant);
+ } else {
+ throw new HoodieIOException("Could not delete instant " + instant);
+ }
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Could not remove requested commit " +
commitFilePath, e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ writeClient.close();
+ }
+
+ @Override
+ public void cancel() {
+
Review comment:
Useless.
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
##########
@@ -121,4 +125,35 @@
@Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks
that do actual write, default is 4.")
public Integer writeTaskNum = 4;
+
+ /**
+ * Transforms a {@code HoodieFlinkStreamer.Config} into {@code
Configuration}.
+ * The latter is more suitable for the table APIs. It reads all the
properties
+ * in the properties file (set by `--props` option) and cmd line options
+ * (set by `--hoodie-conf` option).
+ */
+ @SuppressWarnings("unchecked, rawtypes")
+ public static org.apache.flink.configuration.Configuration
fromStreamerConfig(FlinkStreamerConfig config) {
+ Map<String, String> propsMap = new HashMap<String, String>((Map)
StreamerUtil.getProps(config));
Review comment:
toFlinkConfig
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for Flink Hudi Compaction.
+ */
+public class CompactionUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactionUtil.class);
+
+ /**
+ * Create the metaClient.
+ * */
+ public static HoodieTableMetaClient createMetaClient(Configuration conf) {
+ return
HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(getHadoopConf()).build();
+ }
+
+ /**
+ * Get compaction Instant time.
+ * */
+ public static String getCompactionInstantTime(HoodieTableMetaClient
metaClient) {
+ Option<HoodieInstant> hoodieInstantOption =
metaClient.getCommitsTimeline().filterPendingExcludingCompaction().firstInstant();
+ if (hoodieInstantOption.isPresent()) {
+ HoodieInstant latestInstant = hoodieInstantOption.get();
+ String newCommitTime =
StreamerUtil.instantTimeSubtract(latestInstant.getTimestamp(), 1);
+ // Committed and pending compaction instants should have strictly lower
timestamps
+ List<HoodieInstant> conflictingInstants = metaClient.getActiveTimeline()
+
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
+ .filter(instant -> HoodieTimeline.compareTimestamps(
+ instant.getTimestamp(),
HoodieTimeline.GREATER_THAN_OR_EQUALS, newCommitTime))
+ .collect(Collectors.toList());
+ ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
+ "Following instants have timestamps >= compactionInstant (" +
newCommitTime + ") Instants :"
+ + conflictingInstants);
+ return newCommitTime;
+ } else {
+ return HoodieActiveTimeline.createNewInstantTime();
+ }
+ }
+
+ /**
+ * Inferences the deserialization Avro schema from the table schema (e.g.
the DDL)
+ * if both options {@link FlinkOptions#READ_AVRO_SCHEMA_PATH} and
+ * {@link FlinkOptions#READ_AVRO_SCHEMA} are not specified.
+ *
Review comment:
```java
/**
* Sets up the avro schema string into the give configuration {@code conf}
* through reading from the hoodie table metadata.
*/
```
##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -335,4 +336,19 @@ public static String instantTimePlus(String oldInstant,
long milliseconds) {
long oldTime = Long.parseLong(oldInstant);
return String.valueOf(oldTime + milliseconds);
}
+
+ /**
+ * Subtract the old instant time with given milliseconds and returns.
+ * */
+ public static String instantTimeSubtract(String oldInstant, long
milliseconds) {
+ long oldTime = Long.parseLong(oldInstant);
+ return String.valueOf(oldTime - milliseconds);
+ }
+
+ /**
+ * Copied from Objects#equal.
+ */
+ public static boolean equal(@Nullable Object a, @Nullable Object b) {
Review comment:
Remove this method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]