This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ca9bfa2 [HUDI-2332] Add clustering and compaction in Kafka Connect
Sink (#3857)
ca9bfa2 is described below
commit ca9bfa2a4000575dbaa379c91898786f040a9917
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Nov 23 00:53:28 2021 -0800
[HUDI-2332] Add clustering and compaction in Kafka Connect Sink (#3857)
* [HUDI-2332] Add clustering and compaction in Kafka Connect Sink
* Disable validation check on instant time for compaction and adjust configs
* Add javadocs
* Add clustering and compaction config
* Fix transaction causing missing records in the target table
* Add debugging logs
* Fix kafka offset sync in participant
* Adjust how clustering and compaction are configured in kafka-connect
* Fix clustering strategy
* Remove irrelevant changes from other published PRs
* Update clustering logic and others
* Update README
* Fix test failures
* Fix indentation
* Fix clustering config
* Add JavaCustomColumnsSortPartitioner and make async compaction enabled by
default
* Add test for JavaCustomColumnsSortPartitioner
* Add more changes after IDE sync
* Update README with clarification
* Fix clustering logic after rebasing
* Remove unrelated changes
---
.../apache/hudi/config/HoodieClusteringConfig.java | 48 +++-
.../org/apache/hudi/config/HoodieWriteConfig.java | 3 +-
.../cluster/BaseClusteringPlanActionExecutor.java | 34 ++-
.../compact/ScheduleCompactionActionExecutor.java | 16 +-
.../apache/hudi/config/TestHoodieWriteConfig.java | 47 +++-
.../JavaRecentDaysClusteringPlanStrategy.java} | 42 ++--
.../JavaSizeBasedClusteringPlanStrategy.java | 131 +++++++++++
.../run/strategy/JavaExecutionStrategy.java | 242 +++++++++++++++++++++
.../strategy/JavaSortAndSizeExecutionStrategy.java | 70 ++++++
.../JavaCustomColumnsSortPartitioner.java | 62 ++++++
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 79 ++++++-
.../hudi/table/HoodieJavaMergeOnReadTable.java | 22 ++
.../org/apache/hudi/table/HoodieJavaTable.java | 9 +
.../cluster/JavaClusteringPlanActionExecutor.java | 43 ++++
.../JavaExecuteClusteringCommitActionExecutor.java | 123 +++++++++++
.../commit/BaseJavaCommitActionExecutor.java | 3 +-
.../HoodieJavaMergeOnReadTableCompactor.java | 56 +++++
.../TestJavaBulkInsertInternalPartitioner.java | 98 +++++++++
.../SparkRecentDaysClusteringPlanStrategy.java | 2 +
.../RDDCustomColumnsSortPartitioner.java | 30 +--
.../cluster/SparkClusteringPlanActionExecutor.java | 37 ----
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 58 ++++-
hudi-kafka-connect/README.md | 132 ++++++++++-
hudi-kafka-connect/demo/config-sink.json | 1 +
.../hudi/connect/writers/KafkaConnectConfigs.java | 9 +
.../writers/KafkaConnectTransactionServices.java | 29 ++-
.../org/apache/hudi/utilities/HoodieCompactor.java | 2 +-
27 files changed, 1316 insertions(+), 112 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 6c4a6d8..4476e4f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -22,7 +22,9 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
import java.io.File;
import java.io.FileReader;
@@ -41,6 +43,14 @@ public class HoodieClusteringConfig extends HoodieConfig {
// Any strategy specific params can be saved with this prefix
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX =
"hoodie.clustering.plan.strategy.";
+ public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
+
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
+ public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
+
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
+ public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
+
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
+ public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY =
+
"org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy";
// Any Space-filling curves optimize(z-order/hilbert) params can be saved
with this prefix
public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX =
"hoodie.layout.optimize.";
@@ -59,7 +69,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
public static final ConfigProperty<String> PLAN_STRATEGY_CLASS_NAME =
ConfigProperty
.key("hoodie.clustering.plan.strategy.class")
-
.defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy")
+ .defaultValue(SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY)
.sinceVersion("0.7.0")
.withDocumentation("Config to provide a strategy class (subclass of
ClusteringPlanStrategy) to create clustering plan "
+ "i.e select what file groups are being clustered. Default
strategy, looks at the clustering small file size limit (determined by "
@@ -67,7 +77,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
public static final ConfigProperty<String> EXECUTION_STRATEGY_CLASS_NAME =
ConfigProperty
.key("hoodie.clustering.execution.strategy.class")
-
.defaultValue("org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
+ .defaultValue(SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY)
.sinceVersion("0.7.0")
.withDocumentation("Config to provide a strategy class (subclass of
RunClusteringStrategy) to define how the "
+ " clustering plan is executed. By default, we sort the file groups
in th plan by the specified columns, while "
@@ -336,6 +346,12 @@ public class HoodieClusteringConfig extends HoodieConfig {
public static class Builder {
private final HoodieClusteringConfig clusteringConfig = new
HoodieClusteringConfig();
+ private EngineType engineType = EngineType.SPARK;
+
+ public Builder withEngineType(EngineType engineType) {
+ this.engineType = engineType;
+ return this;
+ }
public Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
@@ -455,9 +471,37 @@ public class HoodieClusteringConfig extends HoodieConfig {
}
public HoodieClusteringConfig build() {
+ clusteringConfig.setDefaultValue(
+ PLAN_STRATEGY_CLASS_NAME,
getDefaultPlanStrategyClassName(engineType));
+ clusteringConfig.setDefaultValue(
+ EXECUTION_STRATEGY_CLASS_NAME,
getDefaultExecutionStrategyClassName(engineType));
clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName());
return clusteringConfig;
}
+
+ private String getDefaultPlanStrategyClassName(EngineType engineType) {
+ switch (engineType) {
+ case SPARK:
+ return SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+ case FLINK:
+ case JAVA:
+ return JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+ default:
+ throw new HoodieNotSupportedException("Unsupported engine " +
engineType);
+ }
+ }
+
+ private String getDefaultExecutionStrategyClassName(EngineType engineType)
{
+ switch (engineType) {
+ case SPARK:
+ return SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY;
+ case FLINK:
+ case JAVA:
+ return JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY;
+ default:
+ throw new HoodieNotSupportedException("Unsupported engine " +
engineType);
+ }
+ }
}
/**
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d496610..17386e9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2182,7 +2182,8 @@ public class HoodieWriteConfig extends HoodieConfig {
writeConfig.setDefaultOnCondition(!isCompactionConfigSet,
HoodieCompactionConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isClusteringConfigSet,
-
HoodieClusteringConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
+ HoodieClusteringConfig.newBuilder().withEngineType(engineType)
+ .fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isMetricsConfigSet,
HoodieMetricsConfig.newBuilder().fromProperties(
writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isBootstrapConfigSet,
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
index 97407e3..8071bfb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
@@ -27,10 +27,15 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
@@ -38,6 +43,8 @@ import java.util.Map;
public abstract class BaseClusteringPlanActionExecutor<T extends
HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O,
Option<HoodieClusteringPlan>> {
+ private static final Logger LOG =
LogManager.getLogger(BaseClusteringPlanActionExecutor.class);
+
private final Option<Map<String, String>> extraMetadata;
public BaseClusteringPlanActionExecutor(HoodieEngineContext context,
@@ -49,7 +56,32 @@ public abstract class BaseClusteringPlanActionExecutor<T
extends HoodieRecordPay
this.extraMetadata = extraMetadata;
}
- protected abstract Option<HoodieClusteringPlan> createClusteringPlan();
+ protected Option<HoodieClusteringPlan> createClusteringPlan() {
+ LOG.info("Checking if clustering needs to be run on " +
config.getBasePath());
+ Option<HoodieInstant> lastClusteringInstant =
table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant();
+
+ int commitsSinceLastClustering =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
+
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"),
Integer.MAX_VALUE)
+ .countInstants();
+ if (config.inlineClusteringEnabled() &&
config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
+ LOG.info("Not scheduling inline clustering as only " +
commitsSinceLastClustering
+ + " commits was found since last clustering " +
lastClusteringInstant + ". Waiting for "
+ + config.getInlineClusterMaxCommits());
+ return Option.empty();
+ }
+
+ if (config.isAsyncClusteringEnabled() &&
config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {
+ LOG.info("Not scheduling async clustering as only " +
commitsSinceLastClustering
+ + " commits was found since last clustering " +
lastClusteringInstant + ". Waiting for "
+ + config.getAsyncClusterMaxCommits());
+ return Option.empty();
+ }
+
+ LOG.info("Generating clustering plan for table " + config.getBasePath());
+ ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
+ ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(),
table, context, config);
+ return strategy.generateClusteringPlan();
+ }
@Override
public Option<HoodieClusteringPlan> execute() {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index 87c5a7c..5adaf5a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -68,12 +69,15 @@ public class ScheduleCompactionActionExecutor<T extends
HoodieRecordPayload, I,
public Option<HoodieCompactionPlan> execute() {
if
(!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
&& !config.getFailedWritesCleanPolicy().isLazy()) {
- // if there are inflight writes, their instantTime must not be less than
that of compaction instant time
-
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
- .ifPresent(earliestInflight -> ValidationUtils.checkArgument(
-
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(),
HoodieTimeline.GREATER_THAN, instantTime),
- "Earliest write inflight instant time must be later than
compaction time. Earliest :" + earliestInflight
- + ", Compaction scheduled at " + instantTime));
+ // TODO(yihua): this validation is removed for Java client used by
kafka-connect. Need to revisit this.
+ if (config.getEngineType() != EngineType.JAVA) {
+ // if there are inflight writes, their instantTime must not be less
than that of compaction instant time
+
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
+ .ifPresent(earliestInflight -> ValidationUtils.checkArgument(
+
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(),
HoodieTimeline.GREATER_THAN, instantTime),
+ "Earliest write inflight instant time must be later than
compaction time. Earliest :" + earliestInflight
+ + ", Compaction scheduled at " + instantTime));
+ }
// Committed and pending compaction instants should have strictly lower
timestamps
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index ed6b9e6..1df71e8 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -20,7 +20,6 @@ package org.apache.hudi.config;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.config.HoodieWriteConfig.Builder;
-
import org.apache.hudi.index.HoodieIndex;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -81,6 +80,52 @@ public class TestHoodieWriteConfig {
assertEquals(HoodieIndex.IndexType.INMEMORY, writeConfig.getIndexType());
}
+ @Test
+ public void testDefaultClusteringPlanStrategyClassAccordingToEngineType() {
+ // Default (as Spark)
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath("/tmp").build();
+ assertEquals(
+ HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
+ writeConfig.getClusteringPlanStrategyClass());
+
+ // Spark
+ writeConfig =
HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build();
+ assertEquals(
+ HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
+ writeConfig.getClusteringPlanStrategyClass());
+
+ // Flink and Java
+ for (EngineType engineType : new EngineType[] {EngineType.FLINK,
EngineType.JAVA}) {
+ writeConfig =
HoodieWriteConfig.newBuilder().withEngineType(engineType).withPath("/tmp").build();
+ assertEquals(
+ HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
+ writeConfig.getClusteringPlanStrategyClass());
+ }
+ }
+
+ @Test
+ public void
testDefaultClusteringExecutionStrategyClassAccordingToEngineType() {
+ // Default (as Spark)
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath("/tmp").build();
+ assertEquals(
+ HoodieClusteringConfig.SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY,
+ writeConfig.getClusteringExecutionStrategyClass());
+
+ // Spark
+ writeConfig =
HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build();
+ assertEquals(
+ HoodieClusteringConfig.SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY,
+ writeConfig.getClusteringExecutionStrategyClass());
+
+ // Flink and Java
+ for (EngineType engineType : new EngineType[] {EngineType.FLINK,
EngineType.JAVA}) {
+ writeConfig =
HoodieWriteConfig.newBuilder().withEngineType(engineType).withPath("/tmp").build();
+ assertEquals(
+ HoodieClusteringConfig.JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY,
+ writeConfig.getClusteringExecutionStrategyClass());
+ }
+ }
+
private ByteArrayOutputStream saveParamsIntoOutputStream(Map<String, String>
params) throws IOException {
Properties properties = new Properties();
properties.putAll(params);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java
similarity index 52%
copy from
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
copy to
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java
index 5c13277..6d9b2ee 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java
@@ -7,22 +7,24 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
-import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
+import org.apache.hudi.table.HoodieJavaCopyOnWriteTable;
+import org.apache.hudi.table.HoodieJavaMergeOnReadTable;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -31,24 +33,26 @@ import java.util.List;
import java.util.stream.Collectors;
/**
- * Clustering Strategy that only looks at latest
'daybased.lookback.partitions' partitions.
+ * Clustering Strategy that only looks at latest
'daybased.lookback.partitions' partitions
+ * for Java engine.
*/
-public class SparkRecentDaysClusteringPlanStrategy<T extends
HoodieRecordPayload<T>>
- extends SparkSizeBasedClusteringPlanStrategy<T> {
- private static final Logger LOG =
LogManager.getLogger(SparkRecentDaysClusteringPlanStrategy.class);
+public class JavaRecentDaysClusteringPlanStrategy<T extends
HoodieRecordPayload<T>>
+ extends JavaSizeBasedClusteringPlanStrategy<T> {
+ private static final Logger LOG =
LogManager.getLogger(JavaRecentDaysClusteringPlanStrategy.class);
- public SparkRecentDaysClusteringPlanStrategy(HoodieSparkCopyOnWriteTable<T>
table,
- HoodieSparkEngineContext
engineContext,
- HoodieWriteConfig writeConfig) {
+ public JavaRecentDaysClusteringPlanStrategy(HoodieJavaCopyOnWriteTable<T>
table,
+ HoodieJavaEngineContext
engineContext,
+ HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
- public SparkRecentDaysClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T>
table,
- HoodieSparkEngineContext
engineContext,
- HoodieWriteConfig writeConfig) {
+ public JavaRecentDaysClusteringPlanStrategy(HoodieJavaMergeOnReadTable<T>
table,
+ HoodieJavaEngineContext
engineContext,
+ HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
+ @Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
int targetPartitionsForClustering =
getWriteConfig().getTargetPartitionsForClustering();
int skipPartitionsFromLatestForClustering =
getWriteConfig().getSkipPartitionsFromLatestForClustering();
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
new file mode 100644
index 0000000..9052f03
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieJavaCopyOnWriteTable;
+import org.apache.hudi.table.HoodieJavaMergeOnReadTable;
+import
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * Clustering Strategy for Java engine based on following.
+ * 1) Creates clustering groups based on max size allowed per group.
+ * 2) Excludes files that are greater than 'small.file.limit' from clustering
plan.
+ */
+public class JavaSizeBasedClusteringPlanStrategy<T extends
HoodieRecordPayload<T>>
+ extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> {
+ private static final Logger LOG =
LogManager.getLogger(JavaSizeBasedClusteringPlanStrategy.class);
+
+ public JavaSizeBasedClusteringPlanStrategy(HoodieJavaCopyOnWriteTable<T>
table,
+ HoodieJavaEngineContext
engineContext,
+ HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ public JavaSizeBasedClusteringPlanStrategy(HoodieJavaMergeOnReadTable<T>
table,
+ HoodieJavaEngineContext
engineContext,
+ HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ @Override
+ protected Stream<HoodieClusteringGroup>
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice>
fileSlices) {
+ List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
+ List<FileSlice> currentGroup = new ArrayList<>();
+ long totalSizeSoFar = 0;
+ HoodieWriteConfig writeConfig = getWriteConfig();
+ for (FileSlice currentSlice : fileSlices) {
+ // assume each filegroup size is ~= parquet.max.file.size
+ totalSizeSoFar += currentSlice.getBaseFile().isPresent() ?
currentSlice.getBaseFile().get().getFileSize() :
writeConfig.getParquetMaxFileSize();
+ // check if max size is reached and create new group, if needed.
+ if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() &&
!currentGroup.isEmpty()) {
+ int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
+ LOG.info("Adding one clustering group " + totalSizeSoFar + " max
bytes: "
+ + writeConfig.getClusteringMaxBytesInGroup() + " num input
slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
+ fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+ currentGroup = new ArrayList<>();
+ totalSizeSoFar = 0;
+ }
+ currentGroup.add(currentSlice);
+ // totalSizeSoFar could be 0 when new group was created in the previous
conditional block.
+ // reset to the size of current slice, otherwise the number of output
file group will become 0 even though current slice is present.
+ if (totalSizeSoFar == 0) {
+ totalSizeSoFar += currentSlice.getBaseFile().isPresent() ?
currentSlice.getBaseFile().get().getFileSize() :
writeConfig.getParquetMaxFileSize();
+ }
+ }
+ if (!currentGroup.isEmpty()) {
+ int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar,
writeConfig.getClusteringTargetFileMaxBytes());
+ LOG.info("Adding final clustering group " + totalSizeSoFar + " max
bytes: "
+ + writeConfig.getClusteringMaxBytesInGroup() + " num input
slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
+ fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+ }
+
+ return fileSliceGroups.stream().map(fileSliceGroup ->
HoodieClusteringGroup.newBuilder()
+ .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
+ .setNumOutputFileGroups(fileSliceGroup.getRight())
+ .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
+ .build());
+ }
+
+ @Override
+ protected Map<String, String> getStrategyParams() {
+ Map<String, String> params = new HashMap<>();
+ if
(!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
+ params.put(PLAN_STRATEGY_SORT_COLUMNS.key(),
getWriteConfig().getClusteringSortColumns());
+ }
+ return params;
+ }
+
+ @Override
+ protected List<String> filterPartitionPaths(List<String> partitionPaths) {
+ return partitionPaths;
+ }
+
+ @Override
+ protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String
partition) {
+ return super.getFileSlicesEligibleForClustering(partition)
+ // Only files that have basefile size smaller than small file size are
eligible.
+ .filter(slice ->
slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) <
getWriteConfig().getClusteringSmallFileLimit());
+ }
+
+ private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize)
{
+ return (int) Math.ceil(groupSize / (double) targetFileSize);
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
new file mode 100644
index 0000000..c830925
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.clustering.run.strategy;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.JavaTaskContextSupplier;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.RewriteAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader;
+import static
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
+/**
+ * Clustering strategy for Java engine.
+ */
+public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>>
+ extends ClusteringExecutionStrategy<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> {
+
+ private static final Logger LOG =
LogManager.getLogger(JavaExecutionStrategy.class);
+
+ public JavaExecutionStrategy(
+ HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig
writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> performClustering(
+ HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) {
+ // execute clustering for each group and collect WriteStatus
+ List<WriteStatus> writeStatusList = new ArrayList<>();
+ clusteringPlan.getInputGroups().forEach(
+ inputGroup -> writeStatusList.addAll(runClusteringForGroup(
+ inputGroup, clusteringPlan.getStrategy().getStrategyParams(),
+
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
+ instantTime)));
+ HoodieWriteMetadata<List<WriteStatus>> writeMetadata = new
HoodieWriteMetadata<>();
+ writeMetadata.setWriteStatuses(writeStatusList);
+ return writeMetadata;
+ }
+
+ /**
+ * Execute clustering to write inputRecords into new files as defined by
rules in strategy parameters.
+ * The number of new file groups created is bounded by numOutputGroups.
+ * Note that commit is not done as part of strategy. commit is callers
responsibility.
+ *
+ * @param inputRecords List of {@link HoodieRecord}.
+ * @param numOutputGroups Number of output file groups.
+ * @param instantTime Clustering (replace commit) instant time.
+ * @param strategyParams Strategy parameters containing columns to
sort the data by when clustering.
+ * @param schema Schema of the data including metadata
fields.
+ * @param fileGroupIdList File group id corresponding to each out
group.
+ * @param preserveHoodieMetadata Whether to preserve commit metadata while
clustering.
+ * @return List of {@link WriteStatus}.
+ */
+ public abstract List<WriteStatus> performClusteringWithRecordList(
+ final List<HoodieRecord<T>> inputRecords, final int numOutputGroups,
final String instantTime,
+ final Map<String, String> strategyParams, final Schema schema,
+ final List<HoodieFileGroupId> fileGroupIdList, final boolean
preserveHoodieMetadata);
+
+ /**
+ * Create {@link BulkInsertPartitioner} based on strategy params.
+ *
+ * @param strategyParams Strategy parameters containing columns to sort the
data by when clustering.
+ * @param schema Schema of the data including metadata fields.
+ * @return empty for now.
+ */
+ protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String,
String> strategyParams, Schema schema) {
+ if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
+ return Option.of(new JavaCustomColumnsSortPartitioner(
+ strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
+ HoodieAvroUtils.addMetadataFields(schema)));
+ } else {
+ return Option.empty();
+ }
+ }
+
+ /**
+ * Executes clustering for the group.
+ */
+ private List<WriteStatus> runClusteringForGroup(
+ HoodieClusteringGroup clusteringGroup, Map<String, String>
strategyParams,
+ boolean preserveHoodieMetadata, String instantTime) {
+ List<HoodieRecord<T>> inputRecords = readRecordsForGroup(clusteringGroup,
instantTime);
+ Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(getWriteConfig().getSchema()));
+ List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
+ .map(info -> new HoodieFileGroupId(info.getPartitionPath(),
info.getFileId()))
+ .collect(Collectors.toList());
+ return performClusteringWithRecordList(inputRecords,
clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams,
readerSchema, inputFileIds, preserveHoodieMetadata);
+ }
+
+ /**
+ * Get a list of all records for the group. This includes all records from
file slice
+ * (Apply updates from log files, if any).
+ */
+ private List<HoodieRecord<T>> readRecordsForGroup(HoodieClusteringGroup
clusteringGroup, String instantTime) {
+ List<ClusteringOperation> clusteringOps =
clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
+ boolean hasLogFiles = clusteringOps.stream().anyMatch(op ->
op.getDeltaFilePaths().size() > 0);
+ if (hasLogFiles) {
+ // if there are log files, we read all records into memory for a file
group and apply updates.
+ return readRecordsForGroupWithLogs(clusteringOps, instantTime);
+ } else {
+ // We want to optimize reading records for case there are no log files.
+ return readRecordsForGroupBaseFiles(clusteringOps);
+ }
+ }
+
+ /**
+ * Read records from baseFiles and apply updates.
+ */
+ private List<HoodieRecord<T>>
readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps,
+ String
instantTime) {
+ HoodieWriteConfig config = getWriteConfig();
+ HoodieTable table = getHoodieTable();
+ List<HoodieRecord<T>> records = new ArrayList<>();
+
+ clusteringOps.forEach(clusteringOp -> {
+ long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new
JavaTaskContextSupplier(), config);
+ LOG.info("MaxMemoryPerCompaction run as part of clustering => " +
maxMemoryPerCompaction);
+ try {
+ Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()));
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(table.getMetaClient().getFs())
+ .withBasePath(table.getMetaClient().getBasePath())
+ .withLogFilePaths(clusteringOp.getDeltaFilePaths())
+ .withReaderSchema(readerSchema)
+ .withLatestInstantTime(instantTime)
+ .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
+ .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
+ .withReverseReader(config.getCompactionReverseLogReadEnabled())
+ .withBufferSize(config.getMaxDFSStreamBufferSize())
+ .withSpillableMapBasePath(config.getSpillableMapBasePath())
+ .withPartition(clusteringOp.getPartitionPath())
+ .build();
+
+ Option<HoodieFileReader> baseFileReader =
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
+ ? Option.empty()
+ :
Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new
Path(clusteringOp.getDataFilePath())));
+ HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+ Iterator<HoodieRecord<T>> fileSliceReader =
getFileSliceReader(baseFileReader, scanner, readerSchema,
+ tableConfig.getPayloadClass(),
+ tableConfig.getPreCombineField(),
+ tableConfig.populateMetaFields() ? Option.empty() :
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
+ tableConfig.getPartitionFieldProp())));
+ fileSliceReader.forEachRemaining(records::add);
+ } catch (IOException e) {
+ throw new HoodieClusteringException("Error reading input data for " +
clusteringOp.getDataFilePath()
+ + " and " + clusteringOp.getDeltaFilePaths(), e);
+ }
+ });
+ return records;
+ }
+
+ /**
+ * Read records from baseFiles.
+ */
+ private List<HoodieRecord<T>>
readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
+ List<HoodieRecord<T>> records = new ArrayList<>();
+ clusteringOps.forEach(clusteringOp -> {
+ try {
+ Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(getWriteConfig().getSchema()));
+ HoodieFileReader<IndexedRecord> baseFileReader =
HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new
Path(clusteringOp.getDataFilePath()));
+ Iterator<IndexedRecord> recordIterator =
baseFileReader.getRecordIterator(readerSchema);
+ recordIterator.forEachRemaining(record ->
records.add(transform(record)));
+ } catch (IOException e) {
+ throw new HoodieClusteringException("Error reading input data for " +
clusteringOp.getDataFilePath()
+ + " and " + clusteringOp.getDeltaFilePaths(), e);
+ }
+ });
+ return records;
+ }
+
+ /**
+ * Transform IndexedRecord into HoodieRecord.
+ */
+ private HoodieRecord<T> transform(IndexedRecord indexedRecord) {
+ GenericRecord record = (GenericRecord) indexedRecord;
+ Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
+ String key = KeyGenUtils.getRecordKeyFromGenericRecord(record,
keyGeneratorOpt);
+ String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record,
keyGeneratorOpt);
+ HoodieKey hoodieKey = new HoodieKey(key, partition);
+
+ HoodieRecordPayload avroPayload = new RewriteAvroPayload(record);
+ HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
+ return hoodieRecord;
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
new file mode 100644
index 0000000..a33af7c
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.clustering.run.strategy;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.commit.JavaBulkInsertHelper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Clustering Strategy based on following.
+ * 1) Java execution engine.
+ * 2) Uses bulk_insert to write data into new files.
+ */
+public class JavaSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
+ extends JavaExecutionStrategy<T> {
+ private static final Logger LOG =
LogManager.getLogger(JavaSortAndSizeExecutionStrategy.class);
+
+ public JavaSortAndSizeExecutionStrategy(HoodieTable table,
+ HoodieEngineContext engineContext,
+ HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ @Override
+ public List<WriteStatus> performClusteringWithRecordList(
+ final List<HoodieRecord<T>> inputRecords, final int numOutputGroups,
+ final String instantTime, final Map<String, String> strategyParams,
final Schema schema,
+ final List<HoodieFileGroupId> fileGroupIdList, final boolean
preserveHoodieMetadata) {
+ LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups
+ " commit:" + instantTime);
+ Properties props = getWriteConfig().getProps();
+ props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(),
String.valueOf(numOutputGroups));
+ // We are calling another action executor - disable auto commit. Strategy
is only expected to write data in new files.
+ props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(),
Boolean.FALSE.toString());
+ props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(),
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
+ HoodieWriteConfig newConfig =
HoodieWriteConfig.newBuilder().withProps(props).build();
+ return (List<WriteStatus>)
JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime,
getHoodieTable(), newConfig,
+ false, getPartitioner(strategyParams, schema), true, numOutputGroups,
new CreateHandleFactory(preserveHoodieMetadata));
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
new file mode 100644
index 0000000..bb7cd5e
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.avro.Schema;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A partitioner that does sorting based on specified column values for Java
client.
+ *
+ * @param <T> HoodieRecordPayload type
+ */
+public class JavaCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
+ implements BulkInsertPartitioner<List<HoodieRecord<T>>> {
+
+ private final String[] sortColumnNames;
+ private final Schema schema;
+
+ public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema)
{
+ this.sortColumnNames = columnNames;
+ this.schema = schema;
+ }
+
+ @Override
+ public List<HoodieRecord<T>> repartitionRecords(
+ List<HoodieRecord<T>> records, int outputSparkPartitions) {
+ return records.stream().sorted((o1, o2) -> {
+ Object values1 = HoodieAvroUtils.getRecordColumnValues(o1,
sortColumnNames, schema);
+ Object values2 = HoodieAvroUtils.getRecordColumnValues(o2,
sortColumnNames, schema);
+ return values1.toString().compareTo(values2.toString());
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean arePartitionRecordsSorted() {
+ return true;
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 4107adb..cead7aa 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -29,6 +29,7 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -39,17 +40,24 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
-import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
+import org.apache.hudi.table.action.cluster.JavaClusteringPlanActionExecutor;
+import
org.apache.hudi.table.action.cluster.JavaExecuteClusteringCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.JavaInsertOverwriteCommitActionExecutor;
import
org.apache.hudi.table.action.commit.JavaInsertOverwriteTableCommitActionExecutor;
import
org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor;
+import org.apache.hudi.table.action.commit.JavaMergeHelper;
import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
import
org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
@@ -57,10 +65,20 @@ import
org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends
HoodieJavaTable<T> {
+public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload>
+ extends HoodieJavaTable<T> implements HoodieCompactionHandler<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieJavaCopyOnWriteTable.class);
+
protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config,
HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
@@ -160,23 +178,23 @@ public class HoodieJavaCopyOnWriteTable<T extends
HoodieRecordPayload> extends H
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext
context,
String instantTime,
Option<Map<String,
String>> extraMetadata) {
- throw new HoodieNotSupportedException("ScheduleCompaction is not supported
yet");
+ throw new HoodieNotSupportedException("ScheduleCompaction is not supported
on a CopyOnWrite table");
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> compact(HoodieEngineContext
context,
String
compactionInstantTime) {
- throw new HoodieNotSupportedException("Compact is not supported yet");
+ throw new HoodieNotSupportedException("Compaction is not supported on a
CopyOnWrite table");
}
@Override
public Option<HoodieClusteringPlan> scheduleClustering(final
HoodieEngineContext context, final String instantTime, final Option<Map<String,
String>> extraMetadata) {
- throw new HoodieNotSupportedException("Clustering is not supported yet");
+ return new JavaClusteringPlanActionExecutor<>(context, config, this,
instantTime, extraMetadata).execute();
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> cluster(final
HoodieEngineContext context, final String clusteringInstantTime) {
- throw new HoodieNotSupportedException("Clustering is not supported yet");
+ return new JavaExecuteClusteringCommitActionExecutor<>(context, config,
this, clusteringInstantTime).execute();
}
@Override
@@ -235,4 +253,53 @@ public class HoodieJavaCopyOnWriteTable<T extends
HoodieRecordPayload> extends H
return new CopyOnWriteRestoreActionExecutor(
context, config, this, restoreInstantTime, instantToRestore).execute();
}
+
+ @Override
+ public Iterator<List<WriteStatus>> handleUpdate(
+ String instantTime, String partitionPath, String fileId,
+ Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile)
+ throws IOException {
+ // these are updates
+ HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime,
partitionPath, fileId, keyToNewRecords, oldDataFile);
+ return handleUpdateInternal(upsertHandle, instantTime, fileId);
+ }
+
+ protected Iterator<List<WriteStatus>>
handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String
instantTime,
+ String fileId)
throws IOException {
+ if (upsertHandle.getOldFilePath() == null) {
+ throw new HoodieUpsertException(
+ "Error in finding the old file path at commit " + instantTime + "
for fileId: " + fileId);
+ } else {
+ JavaMergeHelper.newInstance().runMerge(this, upsertHandle);
+ }
+
+ // TODO(yihua): This needs to be revisited
+ if (upsertHandle.getPartitionPath() == null) {
+ LOG.info("Upsert Handle has partition path as null " +
upsertHandle.getOldFilePath() + ", "
+ + upsertHandle.writeStatuses());
+ }
+
+ return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
+ }
+
+ protected HoodieMergeHandle getUpdateHandle(String instantTime, String
partitionPath, String fileId,
+ Map<String, HoodieRecord<T>>
keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
+ if (requireSortedRecords()) {
+ return new HoodieSortedMergeHandle<>(config, instantTime, this,
keyToNewRecords, partitionPath, fileId,
+ dataFileToBeMerged, taskContextSupplier, Option.empty());
+ } else {
+ return new HoodieMergeHandle<>(config, instantTime, this,
keyToNewRecords, partitionPath, fileId,
+ dataFileToBeMerged, taskContextSupplier, Option.empty());
+ }
+ }
+
+ @Override
+ public Iterator<List<WriteStatus>> handleInsert(
+ String instantTime, String partitionPath, String fileId,
+ Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
+ HoodieCreateHandle<?, ?, ?, ?> createHandle =
+ new HoodieCreateHandle(config, instantTime, this, partitionPath,
fileId, recordMap, taskContextSupplier);
+ createHandle.write();
+ return Collections.singletonList(createHandle.close()).iterator();
+ }
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
index b219ba1..136c25b 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -29,9 +30,13 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import
org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
+import
org.apache.hudi.table.action.compact.HoodieJavaMergeOnReadTableCompactor;
+import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
+import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor;
import
org.apache.hudi.table.action.deltacommit.JavaUpsertPreppedDeltaCommitActionExecutor;
import java.util.List;
+import java.util.Map;
public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends
HoodieJavaCopyOnWriteTable<T> {
protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config,
HoodieEngineContext context, HoodieTableMetaClient metaClient) {
@@ -60,4 +65,21 @@ public class HoodieJavaMergeOnReadTable<T extends
HoodieRecordPayload> extends H
return new
JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context,
config,
this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
}
+
+ @Override
+ public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext
context, String instantTime, Option<Map<String, String>> extraMetadata) {
+ ScheduleCompactionActionExecutor scheduleCompactionExecutor = new
ScheduleCompactionActionExecutor(
+ context, config, this, instantTime, extraMetadata,
+ new HoodieJavaMergeOnReadTableCompactor());
+ return scheduleCompactionExecutor.execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> compact(
+ HoodieEngineContext context, String compactionInstantTime) {
+ RunCompactionActionExecutor compactionExecutor = new
RunCompactionActionExecutor(
+ context, config, this, compactionInstantTime, new
HoodieJavaMergeOnReadTableCompactor(),
+ new HoodieJavaCopyOnWriteTable(config, context, getMetaClient()));
+ return convertMetadata(compactionExecutor.execute());
+ }
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index 8b0a7a9..f9c7caf 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -31,9 +32,12 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.JavaHoodieIndexFactory;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.util.List;
+import static org.apache.hudi.common.data.HoodieList.getList;
+
public abstract class HoodieJavaTable<T extends HoodieRecordPayload>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext
context, HoodieTableMetaClient metaClient) {
@@ -61,6 +65,11 @@ public abstract class HoodieJavaTable<T extends
HoodieRecordPayload>
}
}
+ public static HoodieWriteMetadata<List<WriteStatus>> convertMetadata(
+ HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
+ return metadata.clone(getList(metadata.getWriteStatuses()));
+ }
+
@Override
protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext
context) {
return JavaHoodieIndexFactory.createIndex(config);
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaClusteringPlanActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaClusteringPlanActionExecutor.java
new file mode 100644
index 0000000..1d78ecc
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaClusteringPlanActionExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.Map;
+
+public class JavaClusteringPlanActionExecutor<T extends HoodieRecordPayload>
extends
+ BaseClusteringPlanActionExecutor<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> {
+
+ public JavaClusteringPlanActionExecutor(
+ HoodieEngineContext context, HoodieWriteConfig config,
+ HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> table,
+ String instantTime, Option<Map<String, String>> extraMetadata) {
+ super(context, config, table, instantTime, extraMetadata);
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaExecuteClusteringCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaExecuteClusteringCommitActionExecutor.java
new file mode 100644
index 0000000..83364bd
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/cluster/JavaExecuteClusteringCommitActionExecutor.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
+import org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor;
+
+import org.apache.avro.Schema;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class JavaExecuteClusteringCommitActionExecutor<T extends
HoodieRecordPayload<T>>
+ extends BaseJavaCommitActionExecutor<T> {
+
+ private final HoodieClusteringPlan clusteringPlan;
+
+ public JavaExecuteClusteringCommitActionExecutor(
+ HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table,
+ String instantTime) {
+ super(context, config, table, instantTime, WriteOperationType.CLUSTER);
+ this.clusteringPlan = ClusteringUtils.getClusteringPlan(
+ table.getMetaClient(),
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
+ .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
+ "Unable to read clustering plan for instant: " + instantTime));
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute() {
+ HoodieInstant instant =
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
+ // Mark instant as clustering inflight
+ table.getActiveTimeline().transitionReplaceRequestedToInflight(instant,
Option.empty());
+ table.getMetaClient().reloadActiveTimeline();
+
+ final Schema schema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()));
+ HoodieWriteMetadata<List<WriteStatus>> writeMetadata = (
+ (ClusteringExecutionStrategy<T, List<HoodieRecord<? extends
HoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>>)
+
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
+ new Class<?>[] {HoodieTable.class, HoodieEngineContext.class,
HoodieWriteConfig.class}, table, context, config))
+ .performClustering(clusteringPlan, schema, instantTime);
+ List<WriteStatus> writeStatusList = writeMetadata.getWriteStatuses();
+ List<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);
+
writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
+
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
+ validateWriteResult(writeMetadata);
+ commitOnAutoCommit(writeMetadata);
+ if (!writeMetadata.getCommitMetadata().isPresent()) {
+ HoodieCommitMetadata commitMetadata =
CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(),
writeMetadata.getPartitionToReplaceFileIds(),
+ extraMetadata, operationType, getSchemaToStoreInCommit(),
getCommitActionType());
+ writeMetadata.setCommitMetadata(Option.of(commitMetadata));
+ }
+ return writeMetadata;
+ }
+
+ /**
+ * Validate actions taken by clustering. In the first implementation, we
validate at least one new file is written.
+ * But we can extend this to add more validation. E.g. number of records
read = number of records written etc.
+ * We can also make these validations in BaseCommitActionExecutor to reuse
pre-commit hooks for multiple actions.
+ */
+ private void validateWriteResult(HoodieWriteMetadata<List<WriteStatus>>
writeMetadata) {
+ if (writeMetadata.getWriteStatuses().isEmpty()) {
+ throw new HoodieClusteringException("Clustering plan produced 0
WriteStatus for " + instantTime
+ + " #groups: " + clusteringPlan.getInputGroups().size() + " expected
at least "
+ +
clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ + " write statuses");
+ }
+ }
+
+ @Override
+ protected String getCommitActionType() {
+ return HoodieTimeline.REPLACE_COMMIT_ACTION;
+ }
+
+ @Override
+ protected Map<String, List<String>>
getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>>
writeMetadata) {
+ Set<HoodieFileGroupId> newFilesWritten =
writeMetadata.getWriteStats().get().stream()
+ .map(s -> new HoodieFileGroupId(s.getPartitionPath(),
s.getFileId())).collect(Collectors.toSet());
+ return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
+ .filter(fg -> !newFilesWritten.contains(fg))
+ .collect(Collectors.groupingBy(fg -> fg.getPartitionPath(),
Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
index 66cb407..2a93c50 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -126,13 +126,14 @@ public abstract class BaseJavaCommitActionExecutor<T
extends HoodieRecordPayload
return result;
}
- protected void updateIndex(List<WriteStatus> writeStatuses,
HoodieWriteMetadata<List<WriteStatus>> result) {
+ protected List<WriteStatus> updateIndex(List<WriteStatus> writeStatuses,
HoodieWriteMetadata<List<WriteStatus>> result) {
Instant indexStartTime = Instant.now();
// Update the index back
List<WriteStatus> statuses = HoodieList.getList(
table.getIndex().updateLocation(HoodieList.of(writeStatuses), context,
table));
result.setIndexUpdateDuration(Duration.between(indexStartTime,
Instant.now()));
result.setWriteStatuses(statuses);
+ return statuses;
}
@Override
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
new file mode 100644
index 0000000..30bdcda
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+
+/**
+ * Compacts a hoodie table with merge on read storage in Java engine. Computes
all possible
+ * compactions, passes it through a CompactionFilter and executes all the
compactions and
+ * writes a new version of base files and make a normal commit.
+ */
+public class HoodieJavaMergeOnReadTableCompactor<T extends HoodieRecordPayload>
+ extends HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>> {
+
+ @Override
+ public void preCompact(
+ HoodieTable table, HoodieTimeline pendingCompactionTimeline, String
compactionInstantTime) {
+ HoodieInstant inflightInstant =
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
+ if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
+ table.rollbackInflightCompaction(inflightInstant);
+ table.getMetaClient().reloadActiveTimeline();
+ }
+ }
+
+ @Override
+ public void maybePersist(HoodieData<WriteStatus> writeStatus,
HoodieWriteConfig config) {
+ // No OP
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
new file mode 100644
index 0000000..5d6f211
--- /dev/null
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.testutils.HoodieJavaClientTestBase;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestJavaBulkInsertInternalPartitioner extends
HoodieJavaClientTestBase {
+ private static final Comparator<HoodieRecord> KEY_COMPARATOR =
+ Comparator.comparing(o -> (o.getPartitionPath() + "+" +
o.getRecordKey()));
+
+ public static List<HoodieRecord> generateTestRecordsForBulkInsert(int
numRecords) {
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ List<HoodieRecord> records = dataGenerator.generateInserts("0",
numRecords);
+ return records;
+ }
+
+ public static Map<String, Long>
generatePartitionNumRecords(List<HoodieRecord> records) {
+ return records.stream().map(record -> record.getPartitionPath())
+ .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"rider", "rider,driver"})
+ public void testCustomColumnSortPartitioner(String sortColumnString) throws
Exception {
+ String[] sortColumns = sortColumnString.split(",");
+ Comparator<HoodieRecord> columnComparator =
+ getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA,
sortColumns);
+
+ List<HoodieRecord> records = generateTestRecordsForBulkInsert(1000);
+ testBulkInsertInternalPartitioner(
+ new JavaCustomColumnsSortPartitioner(sortColumns,
HoodieTestDataGenerator.AVRO_SCHEMA),
+ records, true, generatePartitionNumRecords(records),
Option.of(columnComparator));
+ }
+
+ private Comparator<HoodieRecord> getCustomColumnComparator(Schema schema,
String[] sortColumns) {
+ return Comparator.comparing(
+ record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns,
schema).toString());
+ }
+
+ private void verifyRecordAscendingOrder(List<HoodieRecord> records,
+ Option<Comparator<HoodieRecord>>
comparator) {
+ List<HoodieRecord> expectedRecords = new ArrayList<>(records);
+ Collections.sort(expectedRecords, comparator.orElse(KEY_COMPARATOR));
+ assertEquals(expectedRecords, records);
+ }
+
+ private void testBulkInsertInternalPartitioner(BulkInsertPartitioner
partitioner,
+ List<HoodieRecord> records,
+ boolean isSorted,
+ Map<String, Long>
expectedPartitionNumRecords,
+
Option<Comparator<HoodieRecord>> comparator) {
+ List<HoodieRecord> actualRecords =
+ (List<HoodieRecord>) partitioner.repartitionRecords(records, 1);
+ if (isSorted) {
+ // Verify global order
+ verifyRecordAscendingOrder(actualRecords, comparator);
+ }
+
+ // Verify number of records per partition path
+ assertEquals(expectedPartitionNumRecords,
generatePartitionNumRecords(actualRecords));
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
index 5c13277..ad19824 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -49,6 +50,7 @@ public class SparkRecentDaysClusteringPlanStrategy<T extends
HoodieRecordPayload
super(table, engineContext, writeConfig);
}
+ @Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
int targetPartitionsForClustering =
getWriteConfig().getTargetPartitionsForClustering();
int skipPartitionsFromLatestForClustering =
getWriteConfig().getSkipPartitionsFromLatestForClustering();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index fb3c5ec..a36da5f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -18,18 +18,15 @@
package org.apache.hudi.execution.bulkinsert;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.BulkInsertPartitioner;
-import org.apache.spark.api.java.JavaRDD;
-import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaRDD;
/**
* A partitioner that does sorting based on specified column values for each
RDD partition.
@@ -57,7 +54,8 @@ public class RDDCustomColumnsSortPartitioner<T extends
HoodieRecordPayload>
int
outputSparkPartitions) {
final String[] sortColumns = this.sortColumnNames;
final SerializableSchema schema = this.serializableSchema;
- return records.sortBy(record -> getRecordSortColumnValues(record,
sortColumns, schema),
+ return records.sortBy(
+ record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns,
schema),
true, outputSparkPartitions);
}
@@ -66,26 +64,6 @@ public class RDDCustomColumnsSortPartitioner<T extends
HoodieRecordPayload>
return true;
}
- private static Object getRecordSortColumnValues(HoodieRecord<? extends
HoodieRecordPayload> record,
- String[] sortColumns,
- SerializableSchema schema) {
- try {
- GenericRecord genericRecord = (GenericRecord)
record.getData().getInsertValue(schema.get()).get();
- if (sortColumns.length == 1) {
- return HoodieAvroUtils.getNestedFieldVal(genericRecord,
sortColumns[0], true);
- } else {
- StringBuilder sb = new StringBuilder();
- for (String col : sortColumns) {
- sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord,
col, true));
- }
-
- return sb.toString();
- }
- } catch (IOException e) {
- throw new HoodieIOException("Unable to read record with key:" +
record.getKey(), e);
- }
- }
-
private String[] getSortColumnName(HoodieWriteConfig config) {
return config.getUserDefinedBulkInsertPartitionerSortColumns().split(",");
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
index 683d852..81a0a74 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
@@ -18,20 +18,14 @@
package org.apache.hudi.table.action.cluster;
-import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.util.Map;
@@ -40,8 +34,6 @@ import java.util.Map;
public class SparkClusteringPlanActionExecutor<T extends HoodieRecordPayload>
extends
BaseClusteringPlanActionExecutor<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
- private static final Logger LOG =
LogManager.getLogger(SparkClusteringPlanActionExecutor.class);
-
public SparkClusteringPlanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T,
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
@@ -49,33 +41,4 @@ public class SparkClusteringPlanActionExecutor<T extends
HoodieRecordPayload> ex
Option<Map<String, String>>
extraMetadata) {
super(context, config, table, instantTime, extraMetadata);
}
-
- @Override
- protected Option<HoodieClusteringPlan> createClusteringPlan() {
- LOG.info("Checking if clustering needs to be run on " +
config.getBasePath());
- Option<HoodieInstant> lastClusteringInstant =
table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant();
-
- int commitsSinceLastClustering =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
-
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"),
Integer.MAX_VALUE)
- .countInstants();
- if (config.inlineClusteringEnabled() &&
config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
- LOG.info("Not scheduling inline clustering as only " +
commitsSinceLastClustering
- + " commits was found since last clustering " +
lastClusteringInstant + ". Waiting for "
- + config.getInlineClusterMaxCommits());
- return Option.empty();
- }
-
- if (config.isAsyncClusteringEnabled() &&
config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {
- LOG.info("Not scheduling async clustering as only " +
commitsSinceLastClustering
- + " commits was found since last clustering " +
lastClusteringInstant + ". Waiting for "
- + config.getAsyncClusterMaxCommits());
- return Option.empty();
- }
-
- LOG.info("Generating clustering plan for table " + config.getBasePath());
- ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
- ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(),
table, context, config);
- return strategy.generateClusteringPlan();
- }
-
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index a602b45..ff8aefe 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -18,8 +18,10 @@
package org.apache.hudi.avro;
+import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -229,10 +231,10 @@ public class HoodieAvroUtils {
public static Schema removeFields(Schema schema, List<String>
fieldsToRemove) {
List<Schema.Field> filteredFields = schema.getFields()
- .stream()
- .filter(field ->
!fieldsToRemove.contains(field.name()))
- .map(field -> new
Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
- .collect(Collectors.toList());
+ .stream()
+ .filter(field -> !fieldsToRemove.contains(field.name()))
+ .map(field -> new Schema.Field(field.name(), field.schema(),
field.doc(), field.defaultVal()))
+ .collect(Collectors.toList());
Schema filteredSchema = Schema.createRecord(schema.getName(),
schema.getDoc(), schema.getNamespace(), false);
filteredSchema.setFields(filteredFields);
return filteredSchema;
@@ -289,7 +291,7 @@ public class HoodieAvroUtils {
}
public static GenericRecord addHoodieKeyToRecord(GenericRecord record,
String recordKey, String partitionPath,
- String fileName) {
+ String fileName) {
record.put(HoodieRecord.FILENAME_METADATA_FIELD, fileName);
record.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath);
record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey);
@@ -551,7 +553,7 @@ public class HoodieAvroUtils {
} else if (fieldSchema.getType() == Schema.Type.BYTES) {
ByteBuffer byteBuffer = (ByteBuffer) fieldValue;
BigDecimal convertedValue = decimalConversion.fromBytes(byteBuffer,
fieldSchema,
- LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
+ LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
byteBuffer.rewind();
return convertedValue;
}
@@ -570,9 +572,51 @@ public class HoodieAvroUtils {
* @return sanitized name
*/
public static String sanitizeName(String name) {
- if (name.substring(0,1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
+ if (name.substring(0, 1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES,
MASK_FOR_INVALID_CHARS_IN_NAMES);
}
return name.replaceAll(INVALID_AVRO_CHARS_IN_NAMES,
MASK_FOR_INVALID_CHARS_IN_NAMES);
}
+
+ /**
+ * Gets record column values into one object.
+ *
+ * @param record Hoodie record.
+ * @param columns Names of the columns to get values.
+ * @param schema {@link Schema} instance.
+ * @return Column value if a single column, or concatenated String values by
comma.
+ */
+ public static Object getRecordColumnValues(HoodieRecord<? extends
HoodieRecordPayload> record,
+ String[] columns,
+ Schema schema) {
+ try {
+ GenericRecord genericRecord = (GenericRecord)
record.getData().getInsertValue(schema).get();
+ if (columns.length == 1) {
+ return HoodieAvroUtils.getNestedFieldVal(genericRecord, columns[0],
true);
+ } else {
+ StringBuilder sb = new StringBuilder();
+ for (String col : columns) {
+ sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord,
col, true));
+ }
+
+ return sb.toString();
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Unable to read record with key:" +
record.getKey(), e);
+ }
+ }
+
+ /**
+ * Gets record column values into one object.
+ *
+ * @param record Hoodie record.
+ * @param columns Names of the columns to get values.
+ * @param schema {@link SerializableSchema} instance.
+ * @return Column value if a single column, or concatenated String values by
comma.
+ */
+ public static Object getRecordColumnValues(HoodieRecord<? extends
HoodieRecordPayload> record,
+ String[] columns,
+ SerializableSchema schema) {
+ return getRecordColumnValues(record, columns, schema.get());
+ }
}
diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md
index f5087ce..1754eb8 100644
--- a/hudi-kafka-connect/README.md
+++ b/hudi-kafka-connect/README.md
@@ -106,7 +106,7 @@ to generate, with each batch containing a number of
messages and idle time betwe
bash setupKafka.sh -n <num_kafka_messages_per_batch> -b <num_batches>
```
-### 4 - Run the Sink connector worker (multiple workers can be run)
+### 5 - Run the Sink connector worker (multiple workers can be run)
The Kafka connect is a distributed platform, with the ability to run one or
more workers (each running multiple tasks)
that parallely process the records from the Kafka partitions for the same
topic. We provide a properties file with
@@ -120,7 +120,7 @@ cd $KAFKA_HOME
./bin/connect-distributed.sh
$HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
```
-### 5- To add the Hudi Sink to the Connector (delete it if you want to
re-configure)
+### 6 - To add the Hudi Sink to the Connector (delete it if you want to
re-configure)
Once the Connector has started, it will not run the Sink, until the Hudi sink
is added using the web api. The following
curl APIs can be used to delete and add a new Hudi Sink. Again, a default
configuration is provided for the Hudi Sink,
@@ -170,4 +170,132 @@ total 5168
-rw-r--r-- 1 user wheel 440214 Sep 13 21:43
E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet
```
+### 7 - Run async compaction and clustering if scheduled
+When using Merge-On-Read (MOR) as the table type, async compaction and
clustering can be scheduled when the Sink is
+running. Inline compaction and clustering are disabled by default due to
performance reason. By default, async
+compaction scheduling is enabled, and you can disable it by setting
`hoodie.kafka.compaction.async.enable` to `false`.
+Async clustering scheduling is disabled by default, and you can enable it by
setting `hoodie.clustering.async.enabled`
+to `true`.
+
+The Sink only schedules the compaction and clustering if necessary and does
not execute them for performance. You need
+to execute the scheduled compaction and clustering using separate Spark jobs
or Hudi CLI.
+
+After the compaction is scheduled, you can see the requested compaction
instant (`20211111111410.compaction.requested`)
+below:
+
+```
+ls -l /tmp/hoodie/hudi-test-topic/.hoodie
+total 280
+-rw-r--r-- 1 user wheel 21172 Nov 11 11:09 20211111110807.deltacommit
+-rw-r--r-- 1 user wheel 0 Nov 11 11:08
20211111110807.deltacommit.inflight
+-rw-r--r-- 1 user wheel 0 Nov 11 11:08
20211111110807.deltacommit.requested
+-rw-r--r-- 1 user wheel 22458 Nov 11 11:11 20211111110940.deltacommit
+-rw-r--r-- 1 user wheel 0 Nov 11 11:09
20211111110940.deltacommit.inflight
+-rw-r--r-- 1 user wheel 0 Nov 11 11:09
20211111110940.deltacommit.requested
+-rw-r--r-- 1 user wheel 21445 Nov 11 11:13 20211111111110.deltacommit
+-rw-r--r-- 1 user wheel 0 Nov 11 11:11
20211111111110.deltacommit.inflight
+-rw-r--r-- 1 user wheel 0 Nov 11 11:11
20211111111110.deltacommit.requested
+-rw-r--r-- 1 user wheel 24943 Nov 11 11:14 20211111111303.deltacommit
+-rw-r--r-- 1 user wheel 0 Nov 11 11:13
20211111111303.deltacommit.inflight
+-rw-r--r-- 1 user wheel 0 Nov 11 11:13
20211111111303.deltacommit.requested
+-rw-r--r-- 1 user wheel 9885 Nov 11 11:14
20211111111410.compaction.requested
+-rw-r--r-- 1 user wheel 21192 Nov 11 11:15 20211111111411.deltacommit
+-rw-r--r-- 1 user wheel 0 Nov 11 11:14
20211111111411.deltacommit.inflight
+-rw-r--r-- 1 user wheel 0 Nov 11 11:14
20211111111411.deltacommit.requested
+-rw-r--r-- 1 user wheel 0 Nov 11 11:15
20211111111530.deltacommit.inflight
+-rw-r--r-- 1 user wheel 0 Nov 11 11:15
20211111111530.deltacommit.requested
+drwxr-xr-x 2 user wheel 64 Nov 11 11:08 archived
+-rw-r--r-- 1 user wheel 387 Nov 11 11:08 hoodie.properties
+```
+
+Then you can run async compaction job with `HoodieCompactor` and
`spark-submit` by:
+
+```
+spark-submit \
+ --class org.apache.hudi.utilities.HoodieCompactor \
+
hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.10.0-SNAPSHOT.jar
\
+ --base-path /tmp/hoodie/hudi-test-topic \
+ --table-name hudi-test-topic \
+ --schema-file /Users/user/repo/hudi/docker/demo/config/schema.avsc \
+ --instant-time 20211111111410 \
+ --parallelism 2 \
+ --spark-memory 1g
+```
+
+Note that you don't have to provide the instant time through `--instant-time`.
In that case, the earliest scheduled
+compaction is going to be executed.
+
+Alternatively, you can use Hudi CLI to execute compaction:
+
+```
+hudi-> connect --path /tmp/hoodie/hudi-test-topic
+hudi:hudi-test-topic-> compactions show all
+╔═════════════════════════╤═══════════╤═══════════════════════════════╗
+║ Compaction Instant Time │ State │ Total FileIds to be Compacted ║
+╠═════════════════════════╪═══════════╪═══════════════════════════════╣
+║ 20211111111410 │ REQUESTED │ 12 ║
+╚═════════════════════════╧═══════════╧═══════════════════════════════╝
+
+compaction validate --instant 20211111111410
+compaction run --compactionInstant 20211111111410 --parallelism 2
--schemaFilePath /Users/user/repo/hudi/docker/demo/config/schema.avsc
+```
+
+Similarly, you can see the requested clustering instant
(`20211111111813.replacecommit.requested`) after it is scheduled
+by the Sink:
+
+```
+ls -l /tmp/hoodie/hudi-test-topic/.hoodie
+total 736
+-rw-r--r-- 1 user wheel 24943 Nov 11 11:14 20211111111303.deltacommit
+-rw-r--r-- 1 user wheel 0 Nov 11 11:13
20211111111303.deltacommit.inflight
+-rw-r--r-- 1 user wheel 0 Nov 11 11:13
20211111111303.deltacommit.requested
+-rw-r--r-- 1 user wheel 18681 Nov 11 11:17 20211111111410.commit
+-rw-r--r-- 1 user wheel 0 Nov 11 11:17
20211111111410.compaction.inflight
+-rw-r--r-- 1 user wheel 9885 Nov 11 11:14
20211111111410.compaction.requested
+-rw-r--r-- 1 user wheel 21192 Nov 11 11:15 20211111111411.deltacommit
+-rw-r--r-- 1 user wheel 0 Nov 11 11:14
20211111111411.deltacommit.inflight
+-rw-r--r-- 1 user wheel 0 Nov 11 11:14
20211111111411.deltacommit.requested
+-rw-r--r-- 1 user wheel 22460 Nov 11 11:17 20211111111530.deltacommit
+-rw-r--r-- 1 user wheel 0 Nov 11 11:15
20211111111530.deltacommit.inflight
+-rw-r--r-- 1 user wheel 0 Nov 11 11:15
20211111111530.deltacommit.requested
+-rw-r--r-- 1 user wheel 21357 Nov 11 11:18 20211111111711.deltacommit
+-rw-r--r-- 1 user wheel 0 Nov 11 11:17
20211111111711.deltacommit.inflight
+-rw-r--r-- 1 user wheel 0 Nov 11 11:17
20211111111711.deltacommit.requested
+-rw-r--r-- 1 user wheel 6516 Nov 11 11:18
20211111111813.replacecommit.requested
+-rw-r--r-- 1 user wheel 26070 Nov 11 11:20 20211111111815.deltacommit
+-rw-r--r-- 1 user wheel 0 Nov 11 11:18
20211111111815.deltacommit.inflight
+-rw-r--r-- 1 user wheel 0 Nov 11 11:18
20211111111815.deltacommit.requested
+```
+
+Then you can run async clustering job with `HoodieClusteringJob` and
`spark-submit` by:
+
+```
+spark-submit \
+ --class org.apache.hudi.utilities.HoodieClusteringJob \
+
hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.10.0-SNAPSHOT.jar
\
+ --props clusteringjob.properties \
+ --mode execute \
+ --base-path /tmp/hoodie/hudi-test-topic \
+ --table-name sample_table \
+ --instant-time 20211111111813 \
+ --spark-memory 1g
+```
+
+Sample `clusteringjob.properties`:
+
+```
+hoodie.datasource.write.recordkey.field=volume
+hoodie.datasource.write.partitionpath.field=date
+hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/hudi-test-topic/versions/latest
+
+hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
+hoodie.clustering.plan.strategy.small.file.limit=629145600
+hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
+hoodie.clustering.plan.strategy.sort.columns=volume
+
+hoodie.write.concurrency.mode=single_writer
+```
+
+Note that you don't have to provide the instant time through `--instant-time`.
In that case, the earliest scheduled
+clustering is going to be executed.
diff --git a/hudi-kafka-connect/demo/config-sink.json
b/hudi-kafka-connect/demo/config-sink.json
index 2d2be00..12a2f50 100644
--- a/hudi-kafka-connect/demo/config-sink.json
+++ b/hudi-kafka-connect/demo/config-sink.json
@@ -10,6 +10,7 @@
"topics": "hudi-test-topic",
"hoodie.table.name": "hudi-test-topic",
"hoodie.table.type": "MERGE_ON_READ",
+ "hoodie.metadata.enable": "false",
"hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
"hoodie.datasource.write.recordkey.field": "volume",
"hoodie.datasource.write.partitionpath.field": "date",
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
index 773ce1e..714d4d2 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
@@ -73,6 +73,11 @@ public class KafkaConnectConfigs extends HoodieConfig {
+ "the coordinator will wait for the write statuses from all the
partitions"
+ "to ignore the current commit and start a new commit.");
+ public static final ConfigProperty<String> ASYNC_COMPACT_ENABLE =
ConfigProperty
+ .key("hoodie.kafka.compaction.async.enable")
+ .defaultValue("true")
+ .withDocumentation("Controls whether async compaction should be turned
on for MOR table writing.");
+
public static final ConfigProperty<String> META_SYNC_ENABLE = ConfigProperty
.key("hoodie.meta.sync.enable")
.defaultValue("false")
@@ -121,6 +126,10 @@ public class KafkaConnectConfigs extends HoodieConfig {
return getString(KAFKA_VALUE_CONVERTER);
}
+ public Boolean isAsyncCompactEnabled() {
+ return getBoolean(ASYNC_COMPACT_ENABLE);
+ }
+
public Boolean isMetaSyncEnabled() {
return getBoolean(META_SYNC_ENABLE);
}
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
index 8039e56..7ec3034 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -54,6 +55,8 @@ public class KafkaConnectTransactionServices implements
ConnectTransactionServic
private final Option<HoodieTableMetaClient> tableMetaClient;
private final Configuration hadoopConf;
+ private final HoodieWriteConfig writeConfig;
+ private final KafkaConnectConfigs connectConfigs;
private final String tableBasePath;
private final String tableName;
private final HoodieEngineContext context;
@@ -61,8 +64,11 @@ public class KafkaConnectTransactionServices implements
ConnectTransactionServic
private final HoodieJavaWriteClient<HoodieAvroPayload> javaClient;
public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs)
throws HoodieException {
- HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
- .withProperties(connectConfigs.getProps()).build();
+ this.connectConfigs = connectConfigs;
+ this.writeConfig = HoodieWriteConfig.newBuilder()
+ .withEngineType(EngineType.JAVA)
+ .withProperties(connectConfigs.getProps())
+ .build();
tableBasePath = writeConfig.getBasePath();
tableName = writeConfig.getTableName();
@@ -95,6 +101,7 @@ public class KafkaConnectTransactionServices implements
ConnectTransactionServic
}
}
+ @Override
public String startCommit() {
String newCommitTime = javaClient.startCommit();
javaClient.transitionInflight(newCommitTime);
@@ -102,11 +109,23 @@ public class KafkaConnectTransactionServices implements
ConnectTransactionServic
return newCommitTime;
}
+ @Override
public void endCommit(String commitTime, List<WriteStatus> writeStatuses,
Map<String, String> extraMetadata) {
javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata));
LOG.info("Ending Hudi commit " + commitTime);
+
+ // Schedule clustering and compaction as needed.
+ if (writeConfig.isAsyncClusteringEnabled()) {
+ javaClient.scheduleClustering(Option.empty()).ifPresent(
+ instantTs -> LOG.info("Scheduled clustering at instant time:" +
instantTs));
+ }
+ if (isAsyncCompactionEnabled()) {
+ javaClient.scheduleCompaction(Option.empty()).ifPresent(
+ instantTs -> LOG.info("Scheduled compaction at instant time:" +
instantTs));
+ }
}
+ @Override
public Map<String, String> fetchLatestExtraCommitMetadata() {
if (tableMetaClient.isPresent()) {
Option<HoodieCommitMetadata> metadata =
KafkaConnectUtils.getCommitMetadataForLatestInstant(tableMetaClient.get());
@@ -119,4 +138,10 @@ public class KafkaConnectTransactionServices implements
ConnectTransactionServic
}
throw new HoodieException("Fatal error retrieving Hoodie Extra Metadata
since Table Meta Client is absent");
}
+
+ private boolean isAsyncCompactionEnabled() {
+ return tableMetaClient.isPresent()
+ &&
HoodieTableType.MERGE_ON_READ.equals(tableMetaClient.get().getTableType())
+ && connectConfigs.isAsyncCompactEnabled();
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index 75f55bb..ce69eff 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -164,7 +164,7 @@ public class HoodieCompactor {
// Get schema.
SparkRDDWriteClient client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism,
Option.of(cfg.strategyClassName), props);
- if (cfg.compactionInstantTime == null) {
+ if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
throw new IllegalArgumentException("No instant time is provided for
scheduling compaction. "
+ "Please specify the compaction instant time by using
--instant-time.");
}