This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5804ad8 [HUDI-1483] Support async clustering for deltastreamer and
Spark streaming (#3142)
5804ad8 is described below
commit 5804ad8e32ae05758ebc5e47f5d4fb4db371ab52
Author: Sagar Sumit <[email protected]>
AuthorDate: Mon Jul 12 00:13:38 2021 +0530
[HUDI-1483] Support async clustering for deltastreamer and Spark streaming
(#3142)
- Integrate async clustering service with HoodieDeltaStreamer and
HoodieStreamingSink
- Added methods in HoodieAsyncService to reuse code
---
.../apache/hudi/async/AsyncClusteringService.java | 100 +++++++++++++++++++++
.../org/apache/hudi/async/AsyncCompactService.java | 56 +-----------
.../org/apache/hudi/async/HoodieAsyncService.java | 60 ++++++++++++-
.../hudi/client/AbstractClusteringClient.java | 55 ++++++++++++
.../apache/hudi/config/HoodieClusteringConfig.java | 13 ++-
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +
.../hudi/async/SparkAsyncClusteringService.java | 39 ++++++++
.../hudi/client/HoodieSparkClusteringClient.java | 56 ++++++++++++
.../cluster/SparkClusteringPlanActionExecutor.java | 11 ++-
.../apache/hudi/common/util/ClusteringUtils.java | 4 +
.../main/java/org/apache/hudi/DataSourceUtils.java | 6 ++
.../scala/org/apache/hudi/DataSourceOptions.scala | 12 +++
.../SparkStreamingAsyncClusteringService.java | 42 +++++++++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 48 +++++++---
.../org/apache/hudi/HoodieStreamingSink.scala | 55 ++++++++++--
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 2 +
.../hudi-spark/src/test/java/HoodieJavaApp.java | 3 +
.../src/test/java/HoodieJavaStreamingApp.java | 1 +
.../hudi/functional/TestStructuredStreaming.scala | 48 ++++++++--
.../hudi/utilities/deltastreamer/DeltaSync.java | 20 +++++
.../deltastreamer/HoodieDeltaStreamer.java | 83 ++++++++++++++---
.../HoodieMultiTableDeltaStreamer.java | 6 ++
.../functional/TestHoodieDeltaStreamer.java | 98 +++++++++++++++-----
23 files changed, 710 insertions(+), 112 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
new file mode 100644
index 0000000..b9707bb
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.AbstractClusteringClient;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+
+/**
+ * Async clustering service that runs in a separate thread.
+ * Currently, only one clustering thread is allowed to run at any time.
+ */
+public abstract class AsyncClusteringService extends HoodieAsyncService {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LogManager.getLogger(AsyncClusteringService.class);
+
+ private final int maxConcurrentClustering;
+ private transient AbstractClusteringClient clusteringClient;
+
+ public AsyncClusteringService(AbstractHoodieWriteClient writeClient) {
+ this(writeClient, false);
+ }
+
+ public AsyncClusteringService(AbstractHoodieWriteClient writeClient, boolean
runInDaemonMode) {
+ super(runInDaemonMode);
+ this.clusteringClient = createClusteringClient(writeClient);
+ this.maxConcurrentClustering = 1;
+ }
+
+ protected abstract AbstractClusteringClient
createClusteringClient(AbstractHoodieWriteClient client);
+
+ /**
+ * Start clustering service.
+ */
+ @Override
+ protected Pair<CompletableFuture, ExecutorService> startService() {
+ ExecutorService executor =
Executors.newFixedThreadPool(maxConcurrentClustering,
+ r -> {
+ Thread t = new Thread(r, "async_clustering_thread");
+ t.setDaemon(isRunInDaemonMode());
+ return t;
+ });
+
+ return Pair.of(CompletableFuture.allOf(IntStream.range(0,
maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
+ try {
+ while (!isShutdownRequested()) {
+ final HoodieInstant instant = fetchNextAsyncServiceInstant();
+ if (null != instant) {
+ LOG.info("Starting clustering for instant " + instant);
+ clusteringClient.cluster(instant);
+ LOG.info("Finished clustering for instant " + instant);
+ }
+ }
+ LOG.info("Clustering executor shutting down properly");
+ } catch (InterruptedException ie) {
+ LOG.warn("Clustering executor got interrupted exception! Stopping",
ie);
+ } catch (IOException e) {
+ LOG.error("Clustering executor failed", e);
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ return true;
+ }, executor)).toArray(CompletableFuture[]::new)), executor);
+ }
+
+ /**
+ * Update the write client to be used for clustering.
+ */
+ public synchronized void updateWriteClient(AbstractHoodieWriteClient
writeClient) {
+ this.clusteringClient.updateWriteClient(writeClient);
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
index 1bb5daa..2f63297 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
@@ -24,18 +24,14 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
/**
@@ -54,9 +50,6 @@ public abstract class AsyncCompactService extends
HoodieAsyncService {
private final int maxConcurrentCompaction;
private transient AbstractCompactor compactor;
protected transient HoodieEngineContext context;
- private transient BlockingQueue<HoodieInstant> pendingCompactions = new
LinkedBlockingQueue<>();
- private transient ReentrantLock queueLock = new ReentrantLock();
- private transient Condition consumed = queueLock.newCondition();
public AsyncCompactService(HoodieEngineContext context,
AbstractHoodieWriteClient client) {
this(context, client, false);
@@ -72,51 +65,6 @@ public abstract class AsyncCompactService extends
HoodieAsyncService {
protected abstract AbstractCompactor
createCompactor(AbstractHoodieWriteClient client);
/**
- * Enqueues new Pending compaction.
- */
- public void enqueuePendingCompaction(HoodieInstant instant) {
- pendingCompactions.add(instant);
- }
-
- /**
- * Wait till outstanding pending compactions reduces to the passed in value.
- *
- * @param numPendingCompactions Maximum pending compactions allowed
- * @throws InterruptedException
- */
- public void waitTillPendingCompactionsReducesTo(int numPendingCompactions)
throws InterruptedException {
- try {
- queueLock.lock();
- while (!isShutdown() && (pendingCompactions.size() >
numPendingCompactions)) {
- consumed.await();
- }
- } finally {
- queueLock.unlock();
- }
- }
-
- /**
- * Fetch Next pending compaction if available.
- *
- * @return
- * @throws InterruptedException
- */
- private HoodieInstant fetchNextCompactionInstant() throws
InterruptedException {
- LOG.info("Compactor waiting for next instant for compaction upto 60
seconds");
- HoodieInstant instant = pendingCompactions.poll(10, TimeUnit.SECONDS);
- if (instant != null) {
- try {
- queueLock.lock();
- // Signal waiting thread
- consumed.signal();
- } finally {
- queueLock.unlock();
- }
- }
- return instant;
- }
-
- /**
* Start Compaction Service.
*/
@Override
@@ -134,7 +82,7 @@ public abstract class AsyncCompactService extends
HoodieAsyncService {
context.setProperty(EngineProperty.COMPACTION_POOL_NAME,
COMPACT_POOL_NAME);
while (!isShutdownRequested()) {
- final HoodieInstant instant = fetchNextCompactionInstant();
+ final HoodieInstant instant = fetchNextAsyncServiceInstant();
if (null != instant) {
LOG.info("Starting Compaction for instant " + instant);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
index 32dd042..85e0081 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
@@ -18,21 +18,26 @@
package org.apache.hudi.async;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.Serializable;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
/**
- * Base Class for running clean/delta-sync/compaction in separate thread and
controlling their life-cycle.
+ * Base Class for running clean/delta-sync/compaction/clustering in separate
thread and controlling their life-cycle.
*/
public abstract class HoodieAsyncService implements Serializable {
@@ -50,6 +55,12 @@ public abstract class HoodieAsyncService implements
Serializable {
private transient CompletableFuture future;
// Run in daemon mode
private final boolean runInDaemonMode;
+ // Queue to hold pending compaction/clustering instants
+ private transient BlockingQueue<HoodieInstant> pendingInstants = new
LinkedBlockingQueue<>();
+ // Mutex lock for synchronized access to pendingInstants queue
+ private transient ReentrantLock queueLock = new ReentrantLock();
+ // Condition instance to use with the queueLock
+ private transient Condition consumed = queueLock.newCondition();
protected HoodieAsyncService() {
this(false);
@@ -165,4 +176,51 @@ public abstract class HoodieAsyncService implements
Serializable {
public boolean isRunInDaemonMode() {
return runInDaemonMode;
}
+
+ /**
+ * Wait till outstanding pending compaction/clustering reduces to the passed
in value.
+ *
+ * @param numPending Maximum pending compactions/clustering allowed
+ * @throws InterruptedException
+ */
+ public void waitTillPendingAsyncServiceInstantsReducesTo(int numPending)
throws InterruptedException {
+ try {
+ queueLock.lock();
+ while (!isShutdown() && (pendingInstants.size() > numPending)) {
+ consumed.await();
+ }
+ } finally {
+ queueLock.unlock();
+ }
+ }
+
+ /**
+ * Enqueues new pending clustering instant.
+ * @param instant {@link HoodieInstant} to enqueue.
+ */
+ public void enqueuePendingAsyncServiceInstant(HoodieInstant instant) {
+ LOG.info("Enqueuing new pending clustering instant: " +
instant.getTimestamp());
+ pendingInstants.add(instant);
+ }
+
+ /**
+ * Fetch next pending compaction/clustering instant if available.
+ *
+ * @return {@link HoodieInstant} corresponding to the next pending
compaction/clustering.
+ * @throws InterruptedException
+ */
+ HoodieInstant fetchNextAsyncServiceInstant() throws InterruptedException {
+ LOG.info("Waiting for next instant upto 10 seconds");
+ HoodieInstant instant = pendingInstants.poll(10, TimeUnit.SECONDS);
+ if (instant != null) {
+ try {
+ queueLock.lock();
+ // Signal waiting thread
+ consumed.signal();
+ } finally {
+ queueLock.unlock();
+ }
+ }
+ return instant;
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java
new file mode 100644
index 0000000..34234f5
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Client will run one round of clustering.
+ */
+public abstract class AbstractClusteringClient<T extends HoodieRecordPayload,
I, K, O> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ protected transient AbstractHoodieWriteClient<T, I, K, O> clusteringClient;
+
+ public AbstractClusteringClient(AbstractHoodieWriteClient<T, I, K, O>
clusteringClient) {
+ this.clusteringClient = clusteringClient;
+ }
+
+ /**
+ * Run clustering for the instant.
+ * @param instant
+ * @throws IOException
+ */
+ public abstract void cluster(HoodieInstant instant) throws IOException;
+
+ /**
+ * Update the write client used by async clustering.
+ * @param writeClient
+ */
+ public void updateWriteClient(AbstractHoodieWriteClient<T, I, K, O>
writeClient) {
+ this.clusteringClient = writeClient;
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 86d26d3..a750fa2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -53,7 +53,13 @@ public class HoodieClusteringConfig extends HoodieConfig {
.key("hoodie.clustering.inline.max.commits")
.defaultValue("4")
.sinceVersion("0.7.0")
- .withDocumentation("Config to control frequency of clustering");
+ .withDocumentation("Config to control frequency of inline clustering");
+
+ public static final ConfigProperty<String> ASYNC_CLUSTERING_MAX_COMMIT_PROP
= ConfigProperty
+ .key("hoodie.clustering.async.max.commits")
+ .defaultValue("4")
+ .sinceVersion("0.9.0")
+ .withDocumentation("Config to control frequency of async clustering");
// Any strategy specific params can be saved with this prefix
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX =
"hoodie.clustering.plan.strategy.";
@@ -177,6 +183,11 @@ public class HoodieClusteringConfig extends HoodieConfig {
return this;
}
+ public Builder withAsyncClusteringMaxCommits(int numCommits) {
+ clusteringConfig.setValue(ASYNC_CLUSTERING_MAX_COMMIT_PROP,
String.valueOf(numCommits));
+ return this;
+ }
+
public Builder fromProperties(Properties props) {
this.clusteringConfig.getProps().putAll(props);
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 1573c7f..20d2846 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -677,6 +677,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getInt(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP);
}
+ public int getAsyncClusterMaxCommits() {
+ return getInt(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP);
+ }
+
public String getPayloadClass() {
return getString(HoodieCompactionConfig.PAYLOAD_CLASS_PROP);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java
new file mode 100644
index 0000000..ce436ba
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/async/SparkAsyncClusteringService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.AbstractClusteringClient;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.HoodieSparkClusteringClient;
+
+/**
+ * Async clustering service for Spark datasource.
+ */
+public class SparkAsyncClusteringService extends AsyncClusteringService {
+
+ public SparkAsyncClusteringService(AbstractHoodieWriteClient writeClient) {
+ super(writeClient);
+ }
+
+ @Override
+ protected AbstractClusteringClient
createClusteringClient(AbstractHoodieWriteClient client) {
+ return new HoodieSparkClusteringClient(client);
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java
new file mode 100644
index 0000000..884b555
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.IOException;
+
+/**
+ * Async clustering client for Spark datasource.
+ */
+public class HoodieSparkClusteringClient<T extends HoodieRecordPayload> extends
+ AbstractClusteringClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>> {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieSparkClusteringClient.class);
+
+ public HoodieSparkClusteringClient(
+ AbstractHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> clusteringClient) {
+ super(clusteringClient);
+ }
+
+ @Override
+ public void cluster(HoodieInstant instant) throws IOException {
+ LOG.info("Executing clustering instance " + instant);
+ SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>)
clusteringClient;
+ JavaRDD<WriteStatus> res = writeClient.cluster(instant.getTimestamp(),
true).getWriteStatuses();
+ if (res != null &&
res.collect().stream().anyMatch(WriteStatus::hasErrors)) {
+ // TODO: Should we treat this fatal and throw exception?
+ LOG.error("Clustering for instant (" + instant + ") failed with write
errors");
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
index 1f71aa4..683d852 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java
@@ -58,13 +58,20 @@ public class SparkClusteringPlanActionExecutor<T extends
HoodieRecordPayload> ex
int commitsSinceLastClustering =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"),
Integer.MAX_VALUE)
.countInstants();
- if (config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
- LOG.info("Not scheduling clustering as only " +
commitsSinceLastClustering
+ if (config.inlineClusteringEnabled() &&
config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
+ LOG.info("Not scheduling inline clustering as only " +
commitsSinceLastClustering
+ " commits was found since last clustering " +
lastClusteringInstant + ". Waiting for "
+ config.getInlineClusterMaxCommits());
return Option.empty();
}
+ if (config.isAsyncClusteringEnabled() &&
config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {
+ LOG.info("Not scheduling async clustering as only " +
commitsSinceLastClustering
+ + " commits was found since last clustering " +
lastClusteringInstant + ". Waiting for "
+ + config.getAsyncClusterMaxCommits());
+ return Option.empty();
+ }
+
LOG.info("Generating clustering plan for table " + config.getBasePath());
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(),
table, context, config);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 1bf97c6..0d790be 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -206,4 +206,8 @@ public class ClusteringUtils {
metrics.put(TOTAL_LOG_FILES, (double) numLogFiles);
return metrics;
}
+
+ public static List<HoodieInstant>
getPendingClusteringInstantTimes(HoodieTableMetaClient metaClient) {
+ return
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 352a0ca..469f9c7 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
@@ -171,6 +172,8 @@ public class DataSourceUtils {
boolean asyncCompact =
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key()));
boolean inlineCompact = !asyncCompact &&
parameters.get(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY().key())
.equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
+ boolean asyncClusteringEnabled =
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key()));
+ boolean inlineClusteringEnabled =
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INLINE_CLUSTERING_ENABLE_OPT_KEY().key()));
// insert/bulk-insert combining to be true, if filtering for duplicates
boolean combineInserts =
Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY().key()));
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
@@ -184,6 +187,9 @@ public class DataSourceUtils {
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY().key()))
.withInlineCompaction(inlineCompact).build())
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withInlineClustering(inlineClusteringEnabled)
+ .withAsyncClustering(asyncClusteringEnabled).build())
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY().key()))
.build())
// override above with Hoodie configs specified as options.
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index ebb8c6b..ce36831 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -474,6 +474,18 @@ object DataSourceWriteOptions {
.defaultValue("true")
.withDocumentation("")
+ val INLINE_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.datasource.clustering.inline.enable")
+ .defaultValue("false")
+ .sinceVersion("0.9.0")
+ .withDocumentation("Enable inline clustering. Disabled by default.")
+
+ val ASYNC_CLUSTERING_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.datasource.clustering.async.enable")
+ .defaultValue("false")
+ .sinceVersion("0.9.0")
+ .withDocumentation("Enable asynchronous clustering. Disabled by default.")
+
val KAFKA_AVRO_VALUE_DESERIALIZER_CLASS: ConfigProperty[String] =
ConfigProperty
.key("hoodie.deltastreamer.source.kafka.value.deserializer.class")
.defaultValue("io.confluent.kafka.serializers.KafkaAvroDeserializer")
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java
new file mode 100644
index 0000000..81d880e
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.AbstractClusteringClient;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.HoodieSparkClusteringClient;
+
+/**
+ * Async clustering service for Spark structured streaming.
+ * Here, async clustering is run in daemon mode to prevent blocking shutting
down the Spark application.
+ */
+public class SparkStreamingAsyncClusteringService extends
AsyncClusteringService {
+
+ private static final long serialVersionUID = 1L;
+
+ public SparkStreamingAsyncClusteringService(AbstractHoodieWriteClient
writeClient) {
+ super(writeClient, true);
+ }
+
+ @Override
+ protected AbstractClusteringClient
createClusteringClient(AbstractHoodieWriteClient client) {
+ return new HoodieSparkClusteringClient(client);
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 76bc99b..b290533 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -62,6 +62,7 @@ object HoodieSparkSqlWriter {
private val log = LogManager.getLogger(getClass)
private var tableExists: Boolean = false
private var asyncCompactionTriggerFnDefined: Boolean = false
+ private var asyncClusteringTriggerFnDefined: Boolean = false
def write(sqlContext: SQLContext,
mode: SaveMode,
@@ -69,9 +70,10 @@ object HoodieSparkSqlWriter {
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient:
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
- asyncCompactionTriggerFn:
Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] =
Option.empty
+ asyncCompactionTriggerFn:
Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] =
Option.empty,
+ asyncClusteringTriggerFn:
Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] =
Option.empty
)
- : (Boolean, common.util.Option[String], common.util.Option[String],
+ : (Boolean, common.util.Option[String], common.util.Option[String],
common.util.Option[String],
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
val sparkContext = sqlContext.sparkContext
@@ -79,6 +81,7 @@ object HoodieSparkSqlWriter {
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
val tblNameOp =
hoodieConfig.getStringOrThrow(HoodieWriteConfig.TABLE_NAME,
s"'${HoodieWriteConfig.TABLE_NAME.key}' must be set.")
asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined
+ asyncClusteringTriggerFnDefined = asyncClusteringTriggerFn.isDefined
if (path.isEmpty) {
throw new HoodieException(s"'path' must be set.")
}
@@ -112,7 +115,7 @@ object HoodieSparkSqlWriter {
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not
performing actual writes.")
- (false, common.util.Option.empty(), common.util.Option.empty(),
hoodieWriteClient.orNull, tableConfig)
+ (false, common.util.Option.empty(), common.util.Option.empty(),
common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
} else {
// Handle various save modes
handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
@@ -140,7 +143,7 @@ object HoodieSparkSqlWriter {
operation == WriteOperationType.BULK_INSERT) {
val (success, commitTime: common.util.Option[String]) =
bulkInsertAsRow(sqlContext, parameters, df, tblName,
basePath, path, instantTime)
- return (success, commitTime, common.util.Option.empty(),
hoodieWriteClient.orNull, tableConfig)
+ return (success, commitTime, common.util.Option.empty(),
common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
}
// scalastyle:on
@@ -180,6 +183,10 @@ object HoodieSparkSqlWriter {
asyncCompactionTriggerFn.get.apply(client)
}
+ if (isAsyncClusteringEnabled(client, parameters)) {
+ asyncClusteringTriggerFn.get.apply(client)
+ }
+
val hoodieRecords =
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS_OPT_KEY)) {
DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords,
mapAsJavaMap(parameters))
@@ -219,6 +226,10 @@ object HoodieSparkSqlWriter {
asyncCompactionTriggerFn.get.apply(client)
}
+ if (isAsyncClusteringEnabled(client, parameters)) {
+ asyncClusteringTriggerFn.get.apply(client)
+ }
+
// Issue deletes
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeleteOperation(client,
hoodieKeysToDelete, instantTime)
@@ -226,7 +237,7 @@ object HoodieSparkSqlWriter {
}
// Check for errors and commit the write.
- val (writeSuccessful, compactionInstant) =
+ val (writeSuccessful, compactionInstant, clusteringInstant) =
commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
writeResult, parameters, writeClient, tableConfig, jsc,
TableInstantInfo(basePath, instantTime, commitActionType, operation))
@@ -247,7 +258,7 @@ object HoodieSparkSqlWriter {
// it's safe to unpersist cached rdds here
unpersistRdd(writeResult.getWriteStatuses.rdd)
- (writeSuccessful, common.util.Option.ofNullable(instantTime),
compactionInstant, writeClient, tableConfig)
+ (writeSuccessful, common.util.Option.ofNullable(instantTime),
compactionInstant, clusteringInstant, writeClient, tableConfig)
}
}
@@ -565,7 +576,7 @@ object HoodieSparkSqlWriter {
tableConfig: HoodieTableConfig,
jsc: JavaSparkContext,
tableInstantInfo: TableInstantInfo
- ): (Boolean,
common.util.Option[java.lang.String]) = {
+ ): (Boolean,
common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) {
log.info("Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
@@ -593,14 +604,24 @@ object HoodieSparkSqlWriter {
log.info(s"Compaction Scheduled is $compactionInstant")
+ val asyncClusteringEnabled = isAsyncClusteringEnabled(client, parameters)
+ val clusteringInstant: common.util.Option[java.lang.String] =
+ if (asyncClusteringEnabled) {
+ client.scheduleClustering(common.util.Option.of(new
util.HashMap[String, String](mapAsJavaMap(metaMap))))
+ } else {
+ common.util.Option.empty()
+ }
+
+ log.info(s"Clustering Scheduled is $clusteringInstant")
+
val metaSyncSuccess = metaSync(spark,
HoodieWriterUtils.convertMapToHoodieConfig(parameters),
tableInstantInfo.basePath, schema)
log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
- if (!asyncCompactionEnabled) {
+ if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
client.close()
}
- (commitSuccess && metaSyncSuccess, compactionInstant)
+ (commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant)
} else {
log.error(s"${tableInstantInfo.operation} failed with errors")
if (log.isTraceEnabled) {
@@ -615,7 +636,7 @@ object HoodieSparkSqlWriter {
}
})
}
- (false, common.util.Option.empty())
+ (false, common.util.Option.empty(), common.util.Option.empty())
}
}
@@ -631,6 +652,13 @@ object HoodieSparkSqlWriter {
}
}
+ private def isAsyncClusteringEnabled(client:
SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
+ parameters: Map[String, String]) :
Boolean = {
+ log.info(s"Config.asyncClusteringEnabled ?
${client.getConfig.isAsyncClusteringEnabled}")
+ asyncClusteringTriggerFnDefined &&
client.getConfig.isAsyncClusteringEnabled &&
+ parameters.get(ASYNC_CLUSTERING_ENABLE_OPT_KEY.key).exists(r =>
r.toBoolean)
+ }
+
private def getHoodieTableConfig(sparkContext: SparkContext,
tablePath: String,
hoodieTableConfigOpt:
Option[HoodieTableConfig]): HoodieTableConfig = {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 61cad38..8dfcbc4 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
import java.lang
import java.util.function.Function
-import org.apache.hudi.async.{AsyncCompactService,
SparkStreamingAsyncCompactService}
+import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService,
SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.model.HoodieRecordPayload
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig,
HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.util.CompactionUtils
+import org.apache.hudi.common.util.ClusteringUtils
import org.apache.hudi.exception.HoodieCorruptedDataException
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
@@ -52,6 +53,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
private val ignoreFailedBatch =
options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY.key).toBoolean
private var isAsyncCompactorServiceShutdownAbnormally = false
+ private var isAsyncClusteringServiceShutdownAbnormally = false
private val mode =
if (outputMode == OutputMode.Append()) {
@@ -61,6 +63,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
}
private var asyncCompactorService : AsyncCompactService = _
+ private var asyncClusteringService: AsyncClusteringService = _
private var writeClient :
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty
private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty
@@ -68,13 +71,17 @@ class HoodieStreamingSink(sqlContext: SQLContext,
if (isAsyncCompactorServiceShutdownAbnormally) {
throw new IllegalStateException("Async Compactor shutdown unexpectedly")
}
+ if (isAsyncClusteringServiceShutdownAbnormally) {
+ log.error("Async clustering service shutdown unexpectedly")
+ throw new IllegalStateException("Async clustering service shutdown
unexpectedly")
+ }
retry(retryCnt, retryIntervalMs)(
Try(
HoodieSparkSqlWriter.write(
- sqlContext, mode, options, data, hoodieTableConfig, writeClient,
Some(triggerAsyncCompactor))
+ sqlContext, mode, options, data, hoodieTableConfig, writeClient,
Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
) match {
- case Success((true, commitOps, compactionInstantOps, client,
tableConfig)) =>
+ case Success((true, commitOps, compactionInstantOps,
clusteringInstant, client, tableConfig)) =>
log.info(s"Micro batch id=$batchId succeeded"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
@@ -83,9 +90,14 @@ class HoodieStreamingSink(sqlContext: SQLContext,
writeClient = Some(client)
hoodieTableConfig = Some(tableConfig)
if (compactionInstantOps.isPresent) {
- asyncCompactorService.enqueuePendingCompaction(
+ asyncCompactorService.enqueuePendingAsyncServiceInstant(
new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
}
+ if (clusteringInstant.isPresent) {
+ asyncClusteringService.enqueuePendingAsyncServiceInstant(new
HoodieInstant(
+ State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION,
clusteringInstant.get()
+ ))
+ }
Success((true, commitOps, compactionInstantOps))
case Failure(e) =>
// clean up persist rdds in the write process
@@ -107,7 +119,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
if (retryCnt > 1) log.info(s"Retrying the failed micro batch
id=$batchId ...")
Failure(e)
}
- case Success((false, commitOps, compactionInstantOps, client,
tableConfig)) =>
+ case Success((false, commitOps, compactionInstantOps,
clusteringInstant, client, tableConfig)) =>
log.error(s"Micro batch id=$batchId ended up with errors"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
@@ -179,7 +191,33 @@ class HoodieStreamingSink(sqlContext: SQLContext,
.setBasePath(client.getConfig.getBasePath).build()
val pendingInstants :java.util.List[HoodieInstant] =
CompactionUtils.getPendingCompactionInstantTimes(metaClient)
- pendingInstants.foreach((h : HoodieInstant) =>
asyncCompactorService.enqueuePendingCompaction(h))
+ pendingInstants.foreach((h : HoodieInstant) =>
asyncCompactorService.enqueuePendingAsyncServiceInstant(h))
+ }
+ }
+
+ protected def triggerAsyncClustering(client:
SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = {
+ if (null == asyncClusteringService) {
+ log.info("Triggering async clustering!")
+ asyncClusteringService = new SparkStreamingAsyncClusteringService(client)
+ asyncClusteringService.start(new Function[java.lang.Boolean,
java.lang.Boolean] {
+ override def apply(errored: lang.Boolean): lang.Boolean = {
+ log.info(s"Async clustering service shutdown. Errored ? $errored")
+ isAsyncClusteringServiceShutdownAbnormally = errored
+ reset(false)
+ true
+ }
+ })
+
+ // Add Shutdown Hook
+ Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
+ override def run(): Unit = reset(true)
+ }))
+
+ // First time, scan .hoodie folder and get all pending clustering
instants
+ val metaClient =
HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration)
+ .setBasePath(client.getConfig.getBasePath).build()
+ val pendingInstants :java.util.List[HoodieInstant] =
ClusteringUtils.getPendingClusteringInstantTimes(metaClient)
+ pendingInstants.foreach((h : HoodieInstant) =>
asyncClusteringService.enqueuePendingAsyncServiceInstant(h))
}
}
@@ -189,6 +227,11 @@ class HoodieStreamingSink(sqlContext: SQLContext,
asyncCompactorService = null
}
+ if (asyncClusteringService != null) {
+ asyncClusteringService.shutdown(force)
+ asyncClusteringService = null
+ }
+
if (writeClient.isDefined) {
writeClient.get.close()
writeClient = Option.empty
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 586e916..3056103 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -76,6 +76,8 @@ object HoodieWriterUtils {
HIVE_CREATE_MANAGED_TABLE.key() ->
HIVE_CREATE_MANAGED_TABLE.defaultValue.toString,
HIVE_SYNC_AS_DATA_SOURCE_TABLE.key() ->
HIVE_SYNC_AS_DATA_SOURCE_TABLE.defaultValue(),
ASYNC_COMPACT_ENABLE_OPT_KEY.key ->
ASYNC_COMPACT_ENABLE_OPT_KEY.defaultValue,
+ INLINE_CLUSTERING_ENABLE_OPT_KEY.key ->
INLINE_CLUSTERING_ENABLE_OPT_KEY.defaultValue,
+ ASYNC_CLUSTERING_ENABLE_OPT_KEY.key ->
ASYNC_CLUSTERING_ENABLE_OPT_KEY.defaultValue,
ENABLE_ROW_WRITER_OPT_KEY.key -> ENABLE_ROW_WRITER_OPT_KEY.defaultValue
) ++ DataSourceOptionsHelper.translateConfigurations(parameters)
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
index d086c2e..966ffb0 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -156,6 +156,7 @@ public class HoodieJavaApp {
nonPartitionedTable ?
NonpartitionedKeyGenerator.class.getCanonicalName()
: SimpleKeyGenerator.class.getCanonicalName())
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(),
"false")
+
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true")
// This will remove any existing data at path below, and create a
.mode(SaveMode.Overwrite);
@@ -183,6 +184,7 @@ public class HoodieJavaApp {
: SimpleKeyGenerator.class.getCanonicalName()) // Add Key
Extractor
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(),
"false")
+
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true")
.option(HoodieWriteConfig.TABLE_NAME.key(),
tableName).mode(SaveMode.Append);
updateHiveSyncConfig(writer);
@@ -210,6 +212,7 @@ public class HoodieJavaApp {
: SimpleKeyGenerator.class.getCanonicalName()) // Add Key
Extractor
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(),
"false")
+
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true")
.option(HoodieWriteConfig.TABLE_NAME.key(),
tableName).mode(SaveMode.Append);
updateHiveSyncConfig(writer);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index b0f0631..75fa91e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -362,6 +362,7 @@ public class HoodieJavaStreamingApp {
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY().key(),
"timestamp")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY().key(),
"true")
+
.option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY().key(), "true")
.option(HoodieWriteConfig.TABLE_NAME.key(),
tableName).option("checkpointLocation", checkpointLocation)
.outputMode(OutputMode.Append());
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 4203882..66cb1ca 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -190,9 +190,13 @@ class TestStructuredStreaming extends HoodieClientTestBase
{
numInstants
}
- def getInlineClusteringOpts( isInlineClustering: String,
clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = {
+ def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String,
isAsyncCompaction: String,
+ clusteringNumCommit: String, fileMaxRecordNum:
Int):Map[String, String] = {
commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key ->
isInlineClustering,
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key ->
clusteringNumCommit,
+ DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key ->
isAsyncClustering,
+ DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY.key ->
isAsyncCompaction,
+ HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP.key ->
clusteringNumCommit,
HoodieStorageConfig.PARQUET_FILE_MAX_BYTES.key ->
dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
)
}
@@ -207,12 +211,40 @@ class TestStructuredStreaming extends
HoodieClientTestBase {
metaClient.reloadActiveTimeline()
assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
- structuredStreamingForTestClusteringRunner(sourcePath, destPath, true,
+ structuredStreamingForTestClusteringRunner(sourcePath, destPath, true,
false, false,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
}
@Test
- def testStructuredStreamingWithoutInlineClustering(): Unit = {
+ def testStructuredStreamingWithAsyncClustering(): Unit = {
+ val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
+
+ def checkClusteringResult(destPath: String):Unit = {
+ // check have schedule clustering and clustering file group to one
+ waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+ metaClient.reloadActiveTimeline()
+ assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
+ }
+ structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
true, false,
+ HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
+ }
+
+ @Test
+ def testStructuredStreamingWithAsyncClusteringAndCompaction(): Unit = {
+ val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
+
+ def checkClusteringResult(destPath: String):Unit = {
+ // check have schedule clustering and clustering file group to one
+ waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+ metaClient.reloadActiveTimeline()
+ assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
+ }
+ structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
true, true,
+ HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
+ }
+
+ @Test
+ def testStructuredStreamingWithoutClustering(): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
def checkClusteringResult(destPath: String):Unit = {
@@ -224,12 +256,13 @@ class TestStructuredStreaming extends
HoodieClientTestBase {
}, msg)
println(msg)
}
- structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
+ structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
false, false,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
}
- def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath:
String,
- isInlineClustering: Boolean,
partitionOfRecords: String, checkClusteringResult: String => Unit): Unit = {
+ def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath:
String, isInlineClustering: Boolean,
+ isAsyncClustering: Boolean,
isAsyncCompaction: Boolean,
+ partitionOfRecords: String,
checkClusteringResult: String => Unit): Unit = {
// First insert of data
val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000",
100, partitionOfRecords)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
@@ -238,7 +271,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001",
100, partitionOfRecords)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
- val hudiOptions = getInlineClusteringOpts(isInlineClustering.toString,
"2", 100)
+ val hudiOptions = getClusteringOpts(isInlineClustering.toString,
isAsyncClustering.toString,
+ isAsyncCompaction.toString, "2", 100)
val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath,
hudiOptions)
val f2 = Future {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 2aad344..7742e8e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -91,8 +92,10 @@ import java.util.stream.Collectors;
import scala.collection.JavaConversions;
import static
org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP;
+import static
org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY;
import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
+import static
org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING_PROP;
import static
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP;
import static
org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP;
import static
org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP;
@@ -645,6 +648,9 @@ public class DeltaSync implements Serializable {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName)
// Inline compaction is disabled for continuous mode.
otherwise enabled for MOR
.withInlineCompaction(cfg.isInlineCompactionEnabled()).build())
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withInlineClustering(cfg.isInlineClusteringEnabled())
+ .withAsyncClustering(cfg.isAsyncClusteringEnabled()).build())
.withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
.build())
.forTable(cfg.targetTableName)
@@ -663,6 +669,10 @@ public class DeltaSync implements Serializable {
// Validate what deltastreamer assumes of write-config to be really safe
ValidationUtils.checkArgument(config.inlineCompactionEnabled() ==
cfg.isInlineCompactionEnabled(),
String.format("%s should be set to %s", INLINE_COMPACT_PROP.key(),
cfg.isInlineCompactionEnabled()));
+ ValidationUtils.checkArgument(config.inlineClusteringEnabled() ==
cfg.isInlineClusteringEnabled(),
+ String.format("%s should be set to %s", INLINE_CLUSTERING_PROP.key(),
cfg.isInlineClusteringEnabled()));
+ ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() ==
cfg.isAsyncClusteringEnabled(),
+ String.format("%s should be set to %s",
ASYNC_CLUSTERING_ENABLE_OPT_KEY.key(), cfg.isAsyncClusteringEnabled()));
ValidationUtils.checkArgument(!config.shouldAutoCommit(),
String.format("%s should be set to %s", HOODIE_AUTO_COMMIT_PROP.key(),
autoCommit));
ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() ==
cfg.filterDupes,
@@ -736,4 +746,14 @@ public class DeltaSync implements Serializable {
public Option<HoodieTimeline> getCommitTimelineOpt() {
return commitTimelineOpt;
}
+
+ /**
+ * Schedule clustering.
+ * Called from {@link HoodieDeltaStreamer} when async clustering is enabled.
+ *
+ * @return Requested clustering instant.
+ */
+ public Option<String> getClusteringInstantOpt() {
+ return writeClient.scheduleClustering(Option.empty());
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index a1066c7..9734a1d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -18,8 +18,10 @@
package org.apache.hudi.utilities.deltastreamer;
-import org.apache.hudi.async.HoodieAsyncService;
+import org.apache.hudi.async.AsyncClusteringService;
import org.apache.hudi.async.AsyncCompactService;
+import org.apache.hudi.async.HoodieAsyncService;
+import org.apache.hudi.async.SparkAsyncClusteringService;
import org.apache.hudi.async.SparkAsyncCompactService;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -35,15 +37,17 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.utilities.HiveIncrementalPuller;
+import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -282,6 +286,11 @@ public class HoodieDeltaStreamer implements Serializable {
+ "outstanding compactions is less than this number")
public Integer maxPendingCompactions = 5;
+ @Parameter(names = {"--max-pending-clustering"},
+ description = "Maximum number of outstanding inflight/requested
clustering. Delta Sync will not happen unless"
+ + "outstanding clustering is less than this number")
+ public Integer maxPendingClustering = 5;
+
@Parameter(names = {"--continuous"}, description = "Delta Streamer runs in
continuous mode running"
+ " source-fetch -> Transform -> Hudi Write in loop")
public Boolean continuousMode = false;
@@ -351,6 +360,16 @@ public class HoodieDeltaStreamer implements Serializable {
&&
HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
}
+ public boolean isAsyncClusteringEnabled() {
+ return
Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getConfig()
+
.getOrDefault(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key(),
false)));
+ }
+
+ public boolean isInlineClusteringEnabled() {
+ return
Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getConfig()
+ .getOrDefault(HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(),
false)));
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -376,6 +395,7 @@ public class HoodieDeltaStreamer implements Serializable {
&& Objects.equals(filterDupes, config.filterDupes)
&& Objects.equals(enableHiveSync, config.enableHiveSync)
&& Objects.equals(maxPendingCompactions,
config.maxPendingCompactions)
+ && Objects.equals(maxPendingClustering,
config.maxPendingClustering)
&& Objects.equals(continuousMode, config.continuousMode)
&& Objects.equals(minSyncIntervalSeconds,
config.minSyncIntervalSeconds)
&& Objects.equals(sparkMaster, config.sparkMaster)
@@ -396,7 +416,7 @@ public class HoodieDeltaStreamer implements Serializable {
baseFileFormat, propsFilePath, configs, sourceClassName,
sourceOrderingField, payloadClassName, schemaProviderClassName,
transformerClassNames, sourceLimit, operation, filterDupes,
- enableHiveSync, maxPendingCompactions, continuousMode,
+ enableHiveSync, maxPendingCompactions, maxPendingClustering,
continuousMode,
minSyncIntervalSeconds, sparkMaster, commitOnErrors,
deltaSyncSchedulingWeight, compactSchedulingWeight,
deltaSyncSchedulingMinShare,
compactSchedulingMinShare, forceDisableCompaction, checkpoint,
@@ -422,6 +442,7 @@ public class HoodieDeltaStreamer implements Serializable {
+ ", filterDupes=" + filterDupes
+ ", enableHiveSync=" + enableHiveSync
+ ", maxPendingCompactions=" + maxPendingCompactions
+ + ", maxPendingClustering=" + maxPendingClustering
+ ", continuousMode=" + continuousMode
+ ", minSyncIntervalSeconds=" + minSyncIntervalSeconds
+ ", sparkMaster='" + sparkMaster + '\''
@@ -520,6 +541,11 @@ public class HoodieDeltaStreamer implements Serializable {
private Option<AsyncCompactService> asyncCompactService;
/**
+ * Async clustering service.
+ */
+ private Option<AsyncClusteringService> asyncClusteringService;
+
+ /**
* Table Type.
*/
private final HoodieTableType tableType;
@@ -535,6 +561,7 @@ public class HoodieDeltaStreamer implements Serializable {
this.jssc = jssc;
this.sparkSession =
SparkSession.builder().config(jssc.getConf()).getOrCreate();
this.asyncCompactService = Option.empty();
+ this.asyncClusteringService = Option.empty();
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta =
@@ -598,9 +625,17 @@ public class HoodieDeltaStreamer implements Serializable {
Option<Pair<Option<String>, JavaRDD<WriteStatus>>>
scheduledCompactionInstantAndRDD = Option.ofNullable(deltaSync.syncOnce());
if (scheduledCompactionInstantAndRDD.isPresent() &&
scheduledCompactionInstantAndRDD.get().getLeft().isPresent()) {
LOG.info("Enqueuing new pending compaction instant (" +
scheduledCompactionInstantAndRDD.get().getLeft() + ")");
- asyncCompactService.get().enqueuePendingCompaction(new
HoodieInstant(State.REQUESTED,
+
asyncCompactService.get().enqueuePendingAsyncServiceInstant(new
HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION,
scheduledCompactionInstantAndRDD.get().getLeft().get()));
-
asyncCompactService.get().waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
+
asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions);
+ }
+ if (cfg.isAsyncClusteringEnabled()) {
+ Option<String> clusteringInstant =
deltaSync.getClusteringInstantOpt();
+ if (clusteringInstant.isPresent()) {
+ LOG.info("Scheduled async clustering for instant: " +
clusteringInstant.get());
+
asyncClusteringService.get().enqueuePendingAsyncServiceInstant(new
HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION,
clusteringInstant.get()));
+
asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering);
+ }
}
long toSleepMs = cfg.minSyncIntervalSeconds * 1000 -
(System.currentTimeMillis() - start);
if (toSleepMs > 0) {
@@ -615,21 +650,25 @@ public class HoodieDeltaStreamer implements Serializable {
}
}
} finally {
- shutdownCompactor(error);
+ shutdownAsyncServices(error);
}
return true;
}, executor), executor);
}
/**
- * Shutdown compactor as DeltaSync is shutdown.
+ * Shutdown async services like compaction/clustering as DeltaSync is
shutdown.
*/
- private void shutdownCompactor(boolean error) {
+ private void shutdownAsyncServices(boolean error) {
LOG.info("Delta Sync shutdown. Error ?" + error);
if (asyncCompactService.isPresent()) {
LOG.warn("Gracefully shutting down compactor");
asyncCompactService.get().shutdown(false);
}
+ if (asyncClusteringService.isPresent()) {
+ LOG.warn("Gracefully shutting down clustering service");
+ asyncClusteringService.get().shutdown(false);
+ }
}
/**
@@ -649,19 +688,43 @@ public class HoodieDeltaStreamer implements Serializable {
HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(new
Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build();
List<HoodieInstant> pending =
CompactionUtils.getPendingCompactionInstantTimes(meta);
- pending.forEach(hoodieInstant ->
asyncCompactService.get().enqueuePendingCompaction(hoodieInstant));
+ pending.forEach(hoodieInstant ->
asyncCompactService.get().enqueuePendingAsyncServiceInstant(hoodieInstant));
asyncCompactService.get().start((error) -> {
// Shutdown DeltaSync
shutdown(false);
return true;
});
try {
-
asyncCompactService.get().waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
+
asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions);
} catch (InterruptedException ie) {
throw new HoodieException(ie);
}
}
}
+ // start async clustering if required
+ if (cfg.isAsyncClusteringEnabled()) {
+ if (asyncClusteringService.isPresent()) {
+ asyncClusteringService.get().updateWriteClient(writeClient);
+ } else {
+ asyncClusteringService = Option.ofNullable(new
SparkAsyncClusteringService(writeClient));
+ HoodieTableMetaClient meta = HoodieTableMetaClient.builder()
+ .setConf(new Configuration(jssc.hadoopConfiguration()))
+ .setBasePath(cfg.targetBasePath)
+ .setLoadActiveTimelineOnLoad(true).build();
+ List<HoodieInstant> pending =
ClusteringUtils.getPendingClusteringInstantTimes(meta);
+ LOG.info(String.format("Found %d pending clustering instants ",
pending.size()));
+ pending.forEach(hoodieInstant ->
asyncClusteringService.get().enqueuePendingAsyncServiceInstant(hoodieInstant));
+ asyncClusteringService.get().start((error) -> {
+ shutdown(false);
+ return true;
+ });
+ try {
+
asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering);
+ } catch (InterruptedException e) {
+ throw new HoodieException(e);
+ }
+ }
+ }
return true;
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index eb534c4..ddf03cb 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -193,6 +193,7 @@ public class HoodieMultiTableDeltaStreamer {
tableConfig.payloadClassName = globalConfig.payloadClassName;
tableConfig.forceDisableCompaction = globalConfig.forceDisableCompaction;
tableConfig.maxPendingCompactions = globalConfig.maxPendingCompactions;
+ tableConfig.maxPendingClustering = globalConfig.maxPendingClustering;
tableConfig.minSyncIntervalSeconds = globalConfig.minSyncIntervalSeconds;
tableConfig.transformerClassNames = globalConfig.transformerClassNames;
tableConfig.commitOnErrors = globalConfig.commitOnErrors;
@@ -296,6 +297,11 @@ public class HoodieMultiTableDeltaStreamer {
+ "outstanding compactions is less than this number")
public Integer maxPendingCompactions = 5;
+ @Parameter(names = {"--max-pending-clustering"},
+ description = "Maximum number of outstanding inflight/requested
clustering. Delta Sync will not happen unless"
+ + "outstanding clustering is less than this number")
+ public Integer maxPendingClustering = 5;
+
@Parameter(names = {"--continuous"}, description = "Delta Streamer runs in
continuous mode running"
+ " source-fetch -> Transform -> Hudi Write in loop")
public Boolean continuousMode = false;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 8a2648d..642c666 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.exception.HoodieException;
@@ -500,6 +501,14 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
int numDeltaCommits = (int) timeline.getInstants().count();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ",
exp >=" + minExpected);
}
+
+ static void assertAtLeastNReplaceCommits(int minExpected, String
tablePath, FileSystem fs) {
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
+ HoodieTimeline timeline =
meta.getActiveTimeline().getCompletedReplaceTimeline();
+ LOG.info("Timeline Instants=" +
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
+ int numDeltaCommits = (int) timeline.getInstants().count();
+ assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ",
exp >=" + minExpected);
+ }
}
@Test
@@ -987,20 +996,35 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.UPSERT);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
- cfg.configs.add(String.format("%s=%d",
SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
- cfg.configs.add(String.format("%s=false",
HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
- cfg.configs.add(String.format("%s=%s",
HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), "true"));
- cfg.configs.add(String.format("%s=%s",
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key(), "2"));
+ cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true",
"2", "", ""));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
- HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
- int pendingReplaceSize =
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
- int completeReplaceSize =
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
- LOG.info("PendingReplaceSize=" + pendingReplaceSize +
",completeReplaceSize = " + completeReplaceSize);
- return completeReplaceSize > 0;
+ TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+ return true;
});
- HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
- assertEquals(1,
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
+ }
+
+ private List<String> getAsyncServicesConfigs(int totalRecords, String
autoClean, String inlineCluster,
+ String inlineClusterMaxCommit,
String asyncCluster, String asyncClusterMaxCommit) {
+ List<String> configs = new ArrayList<>();
+ configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP,
totalRecords));
+ if (!StringUtils.isNullOrEmpty(autoClean)) {
+ configs.add(String.format("%s=%s",
HoodieCompactionConfig.AUTO_CLEAN_PROP.key(), autoClean));
+ }
+ if (!StringUtils.isNullOrEmpty(inlineCluster)) {
+ configs.add(String.format("%s=%s",
HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), inlineCluster));
+ }
+ if (!StringUtils.isNullOrEmpty(inlineClusterMaxCommit)) {
+ configs.add(String.format("%s=%s",
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key(),
inlineClusterMaxCommit));
+ }
+ if (!StringUtils.isNullOrEmpty(asyncCluster)) {
+ configs.add(String.format("%s=%s",
HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key(), asyncCluster));
+ }
+ if (!StringUtils.isNullOrEmpty(asyncClusterMaxCommit)) {
+ configs.add(String.format("%s=%s",
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP.key(),
asyncClusterMaxCommit));
+ }
+ return configs;
}
private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String
basePath,
@@ -1023,9 +1047,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
- cfg.configs.add(String.format("%s=%d",
SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
- cfg.configs.add(String.format("%s=false",
HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
- cfg.configs.add(String.format("%s=true",
HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY.key()));
+ cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
"true", ""));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
@@ -1049,14 +1071,48 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
} else {
LOG.warn("Schedule clustering failed");
}
- HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
- int pendingReplaceSize =
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
- int completeReplaceSize =
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
- System.out.println("PendingReplaceSize=" + pendingReplaceSize +
",completeReplaceSize = " + completeReplaceSize);
- return completeReplaceSize > 0;
+ TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+ return true;
+ });
+ }
+
+ @Test
+ public void testAsyncClusteringService() throws Exception {
+ String tableBasePath = dfsBasePath + "/asyncClustering";
+ // Keep it higher than batch-size to test continuous mode
+ int totalRecords = 3000;
+
+ // Initial bulk insert
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT);
+ cfg.continuousMode = true;
+ cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
+ cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
"true", "2"));
+ HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+ deltaStreamerTestRunner(ds, cfg, (r) -> {
+ TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+ return true;
+ });
+ }
+
+ @Test
+ public void testAsyncClusteringServiceWithCompaction() throws Exception {
+ String tableBasePath = dfsBasePath + "/asyncClusteringCompaction";
+ // Keep it higher than batch-size to test continuous mode
+ int totalRecords = 3000;
+
+ // Initial bulk insert
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT);
+ cfg.continuousMode = true;
+ cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
+ cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
"true", "2"));
+ HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+ deltaStreamerTestRunner(ds, cfg, (r) -> {
+ TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+ TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
+ TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+ return true;
});
- HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
- assertEquals(1,
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
}
/**