This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9c369c6 [HUDI-1757] Assigns the buckets by record key for Flink
writer (#2757)
9c369c6 is described below
commit 9c369c607df2816ea2cd1221fb6d879e3fb8f74c
Author: Danny Chan <[email protected]>
AuthorDate: Tue Apr 6 19:06:41 2021 +0800
[HUDI-1757] Assigns the buckets by record key for Flink writer (#2757)
Currently we assign the buckets by record partition path which could
cause hotspot if the partition field is datetime type. Changes to assign
buckets by grouping the record whth their key first, the assignment is
valid if only there is no conflict(two task write to the same bucket).
This patch also changes the coordinator execution to be asynchronous.
---
.../org/apache/hudi/index/HoodieIndexUtils.java | 1 -
.../apache/hudi/client/HoodieFlinkWriteClient.java | 12 +-
.../apache/hudi/common/util/ReflectionUtils.java | 2 +-
.../apache/hudi/configuration/FlinkOptions.java | 8 +-
.../java/org/apache/hudi/sink/CleanFunction.java | 8 +-
.../org/apache/hudi/sink/StreamWriteFunction.java | 48 +---
.../hudi/sink/StreamWriteOperatorCoordinator.java | 254 +++++++++------------
.../apache/hudi/sink/compact/CompactFunction.java | 42 +++-
.../hudi/sink/compact/CompactionCommitSink.java | 52 ++---
.../sink/partitioner/BucketAssignFunction.java | 90 ++------
.../hudi/sink/partitioner/BucketAssigner.java | 25 +-
.../hudi/sink/partitioner/BucketAssigners.java | 8 +-
.../partitioner/delta/DeltaBucketAssigner.java | 20 +-
.../sink/transform/RowDataToHoodieFunction.java | 75 +++++-
.../hudi/sink/utils/CoordinatorExecutor.java | 57 +++++
.../apache/hudi/sink/utils/NonThrownExecutor.java | 22 +-
.../org/apache/hudi/table/HoodieTableSink.java | 4 +-
.../java/org/apache/hudi/util/StreamerUtil.java | 20 +-
.../org/apache/hudi/sink/StreamWriteITCase.java | 12 +-
.../sink/TestStreamWriteOperatorCoordinator.java | 68 ++++--
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 23 +-
.../hudi/sink/partitioner/TestBucketAssigner.java | 114 ++++++++-
.../hudi/sink/utils/CompactFunctionWrapper.java | 5 +
.../hudi/sink/utils/MockCoordinatorExecutor.java | 51 +++++
.../sink/utils/StreamWriteFunctionWrapper.java | 15 +-
25 files changed, 637 insertions(+), 399 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 49c81b7..e5426ca 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -41,7 +41,6 @@ public class HoodieIndexUtils {
* Fetches Pair of partition path and {@link HoodieBaseFile}s for interested
partitions.
*
* @param partition Partition of interest
- * @param context Instance of {@link HoodieEngineContext} to use
* @param hoodieTable Instance of {@link HoodieTable} of interest
* @return the list of {@link HoodieBaseFile}
*/
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 6a6bced..7eeec4c 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -260,11 +260,17 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
* but cleaning action should trigger after all the write actions within a
* checkpoint finish.
*
- * @param instantTime The latest successful commit time
+ * @param table Table to commit on
+ * @param metadata Commit Metadata corresponding to committed instant
+ * @param instantTime Instant Time
+ * @param extraMetadata Additional Metadata passed by user
*/
- public void postCommit(String instantTime) {
+ @Override
+ protected void postCommit(HoodieTable<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> table,
+ HoodieCommitMetadata metadata,
+ String instantTime,
+ Option<Map<String, String>> extraMetadata) {
try {
- HoodieTable<?, ?, ?, ?> table = createTable(config, hadoopConf);
// Delete the marker directory for the instant.
new MarkerFiles(createTable(config, hadoopConf), instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
index 23a87e7..bb30b2a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
@@ -47,7 +47,7 @@ public class ReflectionUtils {
private static Map<String, Class<?>> clazzCache = new HashMap<>();
- private static Class<?> getClass(String clazzName) {
+ public static Class<?> getClass(String clazzName) {
if (!clazzCache.containsKey(clazzName)) {
try {
Class<?> clazz = Class.forName(clazzName);
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 8691d91..b66be2b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -255,7 +255,7 @@ public class FlinkOptions {
public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
.key("write.batch.size.MB")
.doubleType()
- .defaultValue(128D) // 128MB
+ .defaultValue(2D) // 2MB
.withDescription("Batch buffer size in MB to flush data into the
underneath filesystem");
// ------------------------------------------------------------------------
@@ -294,6 +294,12 @@ public class FlinkOptions {
.defaultValue(3600) // default 1 hour
.withDescription("Max delta seconds time needed to trigger compaction,
default 1 hour");
+ public static final ConfigOption<Integer> COMPACTION_MAX_MEMORY =
ConfigOptions
+ .key("compaction.max_memory")
+ .intType()
+ .defaultValue(100) // default 100 MB
+ .withDescription("Max memory in MB for compaction spillable map, default
100MB");
+
public static final ConfigOption<Boolean> CLEAN_ASYNC_ENABLED = ConfigOptions
.key("clean.async.enabled")
.booleanType()
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 7b875ff..14dd827 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -30,6 +30,8 @@ import
org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Sink function that cleans the old commits.
@@ -40,6 +42,8 @@ import
org.apache.flink.streaming.api.functions.sink.SinkFunction;
*/
public class CleanFunction<T> extends AbstractRichFunction
implements SinkFunction<T>, CheckpointedFunction, CheckpointListener {
+ private static final Logger LOG =
LoggerFactory.getLogger(CleanFunction.class);
+
private final Configuration conf;
private HoodieFlinkWriteClient writeClient;
@@ -56,7 +60,7 @@ public class CleanFunction<T> extends AbstractRichFunction
super.open(parameters);
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
this.writeClient = StreamerUtil.createWriteClient(conf,
getRuntimeContext());
- this.executor = new NonThrownExecutor();
+ this.executor = new NonThrownExecutor(LOG);
}
}
@@ -70,7 +74,7 @@ public class CleanFunction<T> extends AbstractRichFunction
// ensure to switch the isCleaning flag
this.isCleaning = false;
}
- }, "wait for cleaning finish", "");
+ }, "wait for cleaning finish");
}
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 364d28e..b0321ac 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -31,9 +31,9 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
-import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -50,8 +50,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
/**
@@ -104,21 +102,6 @@ public class StreamWriteFunction<K, I, O>
private transient Map<String, DataBucket> buckets;
/**
- * The buffer lock to control data buffering/flushing.
- */
- private transient ReentrantLock bufferLock;
-
- /**
- * The condition to decide whether to add new records into the buffer.
- */
- private transient Condition addToBufferCondition;
-
- /**
- * Flag saying whether there is an on-going checkpoint.
- */
- private volatile boolean onCheckpointing = false;
-
- /**
* Config options.
*/
private final Configuration config;
@@ -169,32 +152,15 @@ public class StreamWriteFunction<K, I, O>
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
- bufferLock.lock();
- try {
- // Based on the fact that the coordinator starts the checkpoint first,
- // it would check the validity.
- this.onCheckpointing = true;
- // wait for the buffer data flush out and request a new instant
- flushRemaining(false);
- // signal the task thread to start buffering
- addToBufferCondition.signal();
- } finally {
- this.onCheckpointing = false;
- bufferLock.unlock();
- }
+ // Based on the fact that the coordinator starts the checkpoint first,
+ // it would check the validity.
+ // wait for the buffer data flush out and request a new instant
+ flushRemaining(false);
}
@Override
public void processElement(I value, KeyedProcessFunction<K, I, O>.Context
ctx, Collector<O> out) throws Exception {
- bufferLock.lock();
- try {
- if (onCheckpointing) {
- addToBufferCondition.await();
- }
- bufferRecord(value);
- } finally {
- bufferLock.unlock();
- }
+ bufferRecord(value);
}
@Override
@@ -247,8 +213,6 @@ public class StreamWriteFunction<K, I, O>
private void initBuffer() {
this.buckets = new LinkedHashMap<>();
- this.bufferLock = new ReentrantLock();
- this.addToBufferCondition = this.bufferLock.newCondition();
}
private void initWriteFunction() {
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 51149e2..54a7603 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -26,27 +26,20 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
+import org.apache.hudi.sink.utils.CoordinatorExecutor;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
-import org.apache.flink.util.Preconditions;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -64,7 +57,7 @@ import static
org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
* <p>This coordinator starts a new instant when a new checkpoint starts. It
commits the instant when all the
* operator tasks write the buffer successfully for a round of checkpoint.
*
- * <p>If there is no data for a round of checkpointing, it rolls back the
metadata.
+ * <p>If there is no data for a round of checkpointing, it resets the events
buffer and returns early.
*
* @see StreamWriteFunction for the work flow and semantics
*/
@@ -78,19 +71,19 @@ public class StreamWriteOperatorCoordinator
private final Configuration conf;
/**
- * Write client.
+ * Coordinator context.
*/
- private transient HoodieFlinkWriteClient writeClient;
+ private final Context context;
/**
- * Current data buffering checkpoint.
+ * Write client.
*/
- private long inFlightCheckpoint = -1;
+ private transient HoodieFlinkWriteClient writeClient;
/**
* Current REQUESTED instant, for validation.
*/
- private String instant = "";
+ private volatile String instant = "";
/**
* Event buffer for one round of checkpointing. When all the elements are
non-null and have the same
@@ -111,7 +104,12 @@ public class StreamWriteOperatorCoordinator
/**
* A single-thread executor to handle all the asynchronous jobs of the
coordinator.
*/
- private NonThrownExecutor executor;
+ private CoordinatorExecutor executor;
+
+ /**
+ * A single-thread executor to handle asynchronous hive sync.
+ */
+ private NonThrownExecutor hiveSyncExecutor;
/**
* Context that holds variables for asynchronous hive sync.
@@ -121,14 +119,15 @@ public class StreamWriteOperatorCoordinator
/**
* Constructs a StreamingSinkOperatorCoordinator.
*
- * @param conf The config options
- * @param parallelism The operator task number
+ * @param conf The config options
+ * @param context The coordinator context
*/
public StreamWriteOperatorCoordinator(
Configuration conf,
- int parallelism) {
+ Context context) {
this.conf = conf;
- this.parallelism = parallelism;
+ this.context = context;
+ this.parallelism = context.currentParallelism();
this.needsScheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
}
@@ -142,6 +141,8 @@ public class StreamWriteOperatorCoordinator
initTableIfNotExists(this.conf);
// start a new instant
startInstant();
+ // start the executor
+ this.executor = new CoordinatorExecutor(this.context, LOG);
// start the executor if required
if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
initHiveSync();
@@ -162,38 +163,46 @@ public class StreamWriteOperatorCoordinator
@Override
public void checkpointCoordinator(long checkpointId,
CompletableFuture<byte[]> result) {
- try {
- this.inFlightCheckpoint = checkpointId;
- result.complete(writeCheckpointBytes());
- } catch (Throwable throwable) {
- // when a checkpoint fails, throws directly.
- result.completeExceptionally(
- new CompletionException(
- String.format("Failed to checkpoint Instant %s for source %s",
- this.instant, this.getClass().getSimpleName()), throwable));
- }
+ executor.execute(
+ () -> {
+ try {
+ result.complete(new byte[0]);
+ } catch (Throwable throwable) {
+ // when a checkpoint fails, throws directly.
+ result.completeExceptionally(
+ new CompletionException(
+ String.format("Failed to checkpoint Instant %s for source
%s",
+ this.instant, this.getClass().getSimpleName()),
throwable));
+ }
+ }, "taking checkpoint %d", checkpointId
+ );
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
- // start to commit the instant.
- final String errorMsg = String.format("Instant [%s] has a complete
checkpoint [%d],\n"
- + "but the coordinator has not received full write success events,\n"
- + "rolls back the instant and rethrow", this.instant, checkpointId);
- checkAndForceCommit(errorMsg);
- // if async compaction is on, schedule the compaction
- if (needsScheduleCompaction) {
- writeClient.scheduleCompaction(Option.empty());
- }
+ executor.execute(
+ () -> {
+ // for streaming mode, commits the ever received events anyway,
+ // the stream write task snapshot and flush the data buffer
synchronously in sequence,
+ // so a successful checkpoint subsumes the old one(follows the
checkpoint subsuming contract)
+ final boolean committed = commitInstant();
+ if (committed) {
+ // if async compaction is on, schedule the compaction
+ if (needsScheduleCompaction) {
+ writeClient.scheduleCompaction(Option.empty());
+ }
+ // start new instant.
+ startInstant();
+ }
+ }, "commits the instant %s", this.instant
+ );
// sync Hive if is enabled
syncHiveIfEnabled();
- // start new instant.
- startInstant();
}
private void syncHiveIfEnabled() {
if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
- this.executor.execute(this::syncHive, "sync hive metadata",
this.instant);
+ this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for
instant %s", this.instant);
}
}
@@ -211,43 +220,38 @@ public class StreamWriteOperatorCoordinator
this.conf.getString(FlinkOptions.TABLE_NAME),
conf.getString(FlinkOptions.TABLE_TYPE));
}
- public void notifyCheckpointAborted(long checkpointId) {
- Preconditions.checkState(inFlightCheckpoint == checkpointId,
- "The aborted checkpoint should always be the last checkpoint");
- checkAndForceCommit("The last checkpoint was aborted, roll back the last
write and throw");
- }
-
@Override
public void resetToCheckpoint(long checkpointID, @Nullable byte[]
checkpointData) throws Exception {
- if (checkpointData != null) {
- // restore when any checkpoint completed
- deserializeCheckpointAndRestore(checkpointData);
- }
+ // no operation
}
@Override
public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
- // no event to handle
- ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
- "The coordinator can only handle BatchWriteSuccessEvent");
- BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent;
- // the write task does not block after checkpointing(and before it
receives a checkpoint success event),
- // if it it checkpoints succeed then flushes the data buffer again before
this coordinator receives a checkpoint
- // success event, the data buffer would flush with an older instant time.
- ValidationUtils.checkState(
- HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
- String.format("Receive an unexpected event for instant %s from task
%d",
- event.getInstantTime(), event.getTaskID()));
- if (this.eventBuffer[event.getTaskID()] != null) {
- this.eventBuffer[event.getTaskID()].mergeWith(event);
- } else {
- this.eventBuffer[event.getTaskID()] = event;
- }
- if (event.isEndInput() && checkReady()) {
- // start to commit the instant.
- doCommit();
- // no compaction scheduling for batch mode
- }
+ executor.execute(
+ () -> {
+ // no event to handle
+ ValidationUtils.checkState(operatorEvent instanceof
BatchWriteSuccessEvent,
+ "The coordinator can only handle BatchWriteSuccessEvent");
+ BatchWriteSuccessEvent event = (BatchWriteSuccessEvent)
operatorEvent;
+ // the write task does not block after checkpointing(and before it
receives a checkpoint success event),
+ // if it it checkpoints succeed then flushes the data buffer again
before this coordinator receives a checkpoint
+ // success event, the data buffer would flush with an older instant
time.
+ ValidationUtils.checkState(
+ HoodieTimeline.compareTimestamps(instant,
HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
+ String.format("Receive an unexpected event for instant %s from
task %d",
+ event.getInstantTime(), event.getTaskID()));
+ if (this.eventBuffer[event.getTaskID()] != null) {
+ this.eventBuffer[event.getTaskID()].mergeWith(event);
+ } else {
+ this.eventBuffer[event.getTaskID()] = event;
+ }
+ if (event.isEndInput() && allEventsReceived()) {
+ // start to commit the instant.
+ commitInstant();
+ // no compaction scheduling for batch mode
+ }
+ }, "handle write success event for instant %s", this.instant
+ );
}
@Override
@@ -265,85 +269,31 @@ public class StreamWriteOperatorCoordinator
// -------------------------------------------------------------------------
private void initHiveSync() {
- this.executor = new NonThrownExecutor();
+ this.hiveSyncExecutor = new NonThrownExecutor(LOG);
this.hiveSyncContext = HiveSyncContext.create(conf);
}
- static byte[] readBytes(DataInputStream in, int size) throws IOException {
- byte[] bytes = new byte[size];
- in.readFully(bytes);
- return bytes;
- }
-
- /**
- * Serialize the coordinator state. The current implementation may not be
super efficient,
- * but it should not matter that much because most of the state should be
rather small.
- * Large states themselves may already be a problem regardless of how the
serialization
- * is implemented.
- *
- * @return A byte array containing the serialized state of the source
coordinator.
- * @throws IOException When something goes wrong in serialization.
- */
- private byte[] writeCheckpointBytes() throws IOException {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
-
- out.writeLong(this.inFlightCheckpoint);
- byte[] serializedInstant = this.instant.getBytes();
- out.writeInt(serializedInstant.length);
- out.write(serializedInstant);
- out.flush();
- return baos.toByteArray();
- }
- }
-
- /**
- * Restore the state of this source coordinator from the state bytes.
- *
- * @param bytes The checkpoint bytes that was returned from {@link
#writeCheckpointBytes()}
- * @throws Exception When the deserialization failed.
- */
- private void deserializeCheckpointAndRestore(byte[] bytes) throws Exception {
- try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
- DataInputStream in = new DataInputViewStreamWrapper(bais)) {
- long checkpointID = in.readLong();
- int serializedInstantSize = in.readInt();
- byte[] serializedInstant = readBytes(in, serializedInstantSize);
- this.inFlightCheckpoint = checkpointID;
- this.instant = new String(serializedInstant);
- }
- }
-
private void reset() {
- this.instant = "";
this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism];
}
- private void checkAndForceCommit(String errMsg) {
- if (!checkReady()) {
- // forced but still has inflight instant
- String inflightInstant =
writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE));
- if (inflightInstant != null) {
- assert inflightInstant.equals(this.instant);
- writeClient.rollback(this.instant);
- throw new HoodieException(errMsg);
- }
- if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
- // The last checkpoint finished successfully.
- return;
- }
- }
- doCommit();
- }
-
/** Checks the buffer is ready to commit. */
- private boolean checkReady() {
+ private boolean allEventsReceived() {
return Arrays.stream(eventBuffer)
.allMatch(event -> event != null && event.isReady(this.instant));
}
- /** Performs the actual commit action. */
- private void doCommit() {
+ /**
+ * Commits the instant.
+ *
+ * @return true if the write statuses are committed successfully.
+ */
+ private boolean commitInstant() {
+ if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
+ // The last checkpoint finished successfully.
+ return false;
+ }
+
List<WriteStatus> writeResults = Arrays.stream(eventBuffer)
.filter(Objects::nonNull)
.map(BatchWriteSuccessEvent::getWriteStatuses)
@@ -351,12 +301,16 @@ public class StreamWriteOperatorCoordinator
.collect(Collectors.toList());
if (writeResults.size() == 0) {
- // No data has written, clear the metadata file
-
this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE),
this.instant);
+ // No data has written, reset the buffer and returns early
reset();
- return;
+ return false;
}
+ doCommit(writeResults);
+ return true;
+ }
+ /** Performs the actual commit action. */
+ private void doCommit(List<WriteStatus> writeResults) {
// commit or rollback
long totalErrorRecords =
writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
long totalRecords =
writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
@@ -371,7 +325,6 @@ public class StreamWriteOperatorCoordinator
boolean success = writeClient.commit(this.instant, writeResults,
Option.of(checkpointCommitMetadata));
if (success) {
- writeClient.postCommit(this.instant);
reset();
LOG.info("Commit instant [{}] success!", this.instant);
} else {
@@ -409,6 +362,19 @@ public class StreamWriteOperatorCoordinator
return writeClient;
}
+ @VisibleForTesting
+ public Context getContext() {
+ return context;
+ }
+
+ @VisibleForTesting
+ public void setExecutor(CoordinatorExecutor executor) throws Exception {
+ if (this.executor != null) {
+ this.executor.close();
+ }
+ this.executor = executor;
+ }
+
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
@@ -432,7 +398,7 @@ public class StreamWriteOperatorCoordinator
@Override
public OperatorCoordinator create(Context context) {
- return new StreamWriteOperatorCoordinator(this.conf,
context.currentParallelism());
+ return new StreamWriteOperatorCoordinator(this.conf, context);
}
}
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
index 7f4f7b9..ee8678b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
@@ -21,13 +21,17 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.util.StreamerUtil;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
@@ -36,6 +40,7 @@ import java.util.List;
* In order to execute scalable, the input should shuffle by the compact event
{@link CompactionPlanEvent}.
*/
public class CompactFunction extends KeyedProcessFunction<Long,
CompactionPlanEvent, CompactionCommitEvent> {
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactFunction.class);
/**
* Config options.
@@ -52,6 +57,11 @@ public class CompactFunction extends
KeyedProcessFunction<Long, CompactionPlanEv
*/
private int taskID;
+ /**
+ * Executor service to execute the compaction task.
+ */
+ private transient NonThrownExecutor executor;
+
public CompactFunction(Configuration conf) {
this.conf = conf;
}
@@ -60,23 +70,33 @@ public class CompactFunction extends
KeyedProcessFunction<Long, CompactionPlanEv
public void open(Configuration parameters) throws Exception {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(conf,
getRuntimeContext());
+ this.executor = new NonThrownExecutor(LOG);
}
@Override
public void processElement(CompactionPlanEvent event, Context context,
Collector<CompactionCommitEvent> collector) throws Exception {
final String instantTime = event.getCompactionInstantTime();
final CompactionOperation compactionOperation = event.getOperation();
+ // executes the compaction task asynchronously to not block the checkpoint
barrier propagate.
+ executor.execute(
+ () -> {
+ HoodieFlinkMergeOnReadTableCompactor compactor = new
HoodieFlinkMergeOnReadTableCompactor();
+ List<WriteStatus> writeStatuses = compactor.compact(
+ new HoodieFlinkCopyOnWriteTable<>(
+ this.writeClient.getConfig(),
+ this.writeClient.getEngineContext(),
+ this.writeClient.getHoodieTable().getMetaClient()),
+ this.writeClient.getHoodieTable().getMetaClient(),
+ this.writeClient.getConfig(),
+ compactionOperation,
+ instantTime);
+ collector.collect(new CompactionCommitEvent(instantTime,
writeStatuses, taskID));
+ }, "Execute compaction for instant %s from task %d", instantTime,
taskID
+ );
+ }
- HoodieFlinkMergeOnReadTableCompactor compactor = new
HoodieFlinkMergeOnReadTableCompactor();
- List<WriteStatus> writeStatuses = compactor.compact(
- new HoodieFlinkCopyOnWriteTable<>(
- this.writeClient.getConfig(),
- this.writeClient.getEngineContext(),
- this.writeClient.getHoodieTable().getMetaClient()),
- this.writeClient.getHoodieTable().getMetaClient(),
- this.writeClient.getConfig(),
- compactionOperation,
- instantTime);
- collector.collect(new CompactionCommitEvent(instantTime, writeStatuses,
taskID));
+ @VisibleForTesting
+ public void setExecutor(NonThrownExecutor executor) {
+ this.executor = executor;
}
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 86dae20..41831cd 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -23,8 +23,6 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.sink.CleanFunction;
@@ -37,8 +35,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -67,13 +66,9 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
/**
* Buffer to collect the event from each compact task {@code
CompactFunction}.
+ * The key is the instant time.
*/
- private transient List<CompactionCommitEvent> commitBuffer;
-
- /**
- * Current on-going compaction instant time.
- */
- private String compactionInstantTime;
+ private transient Map<String, List<CompactionCommitEvent>> commitBuffer;
public CompactionCommitSink(Configuration conf) {
super(conf);
@@ -84,36 +79,32 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.writeClient = StreamerUtil.createWriteClient(conf,
getRuntimeContext());
- this.commitBuffer = new ArrayList<>();
+ this.commitBuffer = new HashMap<>();
}
@Override
public void invoke(CompactionCommitEvent event, Context context) throws
Exception {
- if (compactionInstantTime == null) {
- compactionInstantTime = event.getInstant();
- } else if (!event.getInstant().equals(compactionInstantTime)) {
- // last compaction still not finish, rolls it back
- HoodieInstant inflightInstant =
HoodieTimeline.getCompactionInflightInstant(this.compactionInstantTime);
- writeClient.rollbackInflightCompaction(inflightInstant);
- this.compactionInstantTime = event.getInstant();
- }
- this.commitBuffer.add(event);
- commitIfNecessary();
+ final String instant = event.getInstant();
+ commitBuffer.computeIfAbsent(instant, k -> new ArrayList<>())
+ .add(event);
+ commitIfNecessary(instant, commitBuffer.get(instant));
}
/**
* Condition to commit: the commit buffer has equal size with the compaction
plan operations
* and all the compact commit event {@link CompactionCommitEvent} has the
same compaction instant time.
+ *
+ * @param instant Compaction commit instant time
+ * @param events Commit events ever received for the instant
*/
- private void commitIfNecessary() throws IOException {
+ private void commitIfNecessary(String instant, List<CompactionCommitEvent>
events) throws IOException {
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
- this.writeClient.getHoodieTable().getMetaClient(),
compactionInstantTime);
- boolean isReady = compactionPlan.getOperations().size() ==
commitBuffer.size()
- && commitBuffer.stream().allMatch(event -> event != null &&
Objects.equals(event.getInstant(), compactionInstantTime));
+ this.writeClient.getHoodieTable().getMetaClient(), instant);
+ boolean isReady = compactionPlan.getOperations().size() == events.size();
if (!isReady) {
return;
}
- List<WriteStatus> statuses = this.commitBuffer.stream()
+ List<WriteStatus> statuses = events.stream()
.map(CompactionCommitEvent::getWriteStatuses)
.flatMap(Collection::stream)
.collect(Collectors.toList());
@@ -127,16 +118,15 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
writeClient.getConfig().getSchema());
this.writeClient.completeCompaction(
- metadata, statuses, this.writeClient.getHoodieTable(),
compactionInstantTime);
+ metadata, statuses, this.writeClient.getHoodieTable(), instant);
}
// commit the compaction
- this.writeClient.commitCompaction(compactionInstantTime, statuses,
Option.empty());
+ this.writeClient.commitCompaction(instant, statuses, Option.empty());
// reset the status
- reset();
+ reset(instant);
}
- private void reset() {
- this.commitBuffer.clear();
- this.compactionInstantTime = null;
+ private void reset(String instant) {
+ this.commitBuffer.remove(instant);
}
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 5ea5f92..9c23259 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -21,7 +21,6 @@ package org.apache.hudi.sink.partitioner;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -56,8 +55,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
/**
* The function to build the write profile incrementally for records within a
checkpoint,
@@ -107,28 +104,10 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
private final boolean isChangingRecords;
/**
- * All the partition paths when the task starts. It is used to help checking
whether all the partitions
- * are loaded into the state.
- */
- private transient Set<String> initialPartitionsToLoad;
-
- /**
* State to book-keep which partition is loaded into the index state {@code
indexState}.
*/
private MapState<String, Integer> partitionLoadState;
- /**
- * Whether all partitions are loaded, if it is true,
- * we can only check the state for locations.
- */
- private boolean allPartitionsLoaded = false;
-
- /**
- * Flag saying whether to check that all the partitions are loaded.
- * So that there is chance that flag {@code allPartitionsLoaded} becomes
true.
- */
- private boolean checkPartition = true;
-
public BucketAssignFunction(Configuration conf) {
this.conf = conf;
this.isChangingRecords = WriteOperationType.isChangingRecords(
@@ -144,12 +123,11 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
new SerializableConfiguration(this.hadoopConf),
new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = BucketAssigners.create(
+ getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getNumberOfParallelSubtasks(),
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
context,
writeConfig);
-
- // initialize and check the partitions load state
- loadInitialPartitions();
}
@Override
@@ -181,15 +159,7 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
final BucketInfo bucketInfo;
final HoodieRecordLocation location;
- // Checks whether all the partitions are loaded first.
- if (checkPartition && !allPartitionsLoaded) {
- checkPartitionsLoaded();
- checkPartition = false;
- }
-
- if (!allPartitionsLoaded
- && initialPartitionsToLoad.contains(hoodieKey.getPartitionPath()) //
this is an existing partition
- && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
+ if (!partitionLoadState.contains(hoodieKey.getPartitionPath())) {
// If the partition records are never loaded, load the records first.
loadRecords(hoodieKey.getPartitionPath());
}
@@ -226,9 +196,6 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
this.bucketAssigner.refreshTable();
- if (!allPartitionsLoaded) {
- checkPartition = true;
- }
}
/**
@@ -241,12 +208,21 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
List<HoodieBaseFile> latestBaseFiles =
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath,
hoodieTable);
+ final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+ final int maxParallelism =
getRuntimeContext().getMaxNumberOfParallelSubtasks();
+ final int taskID = getRuntimeContext().getIndexOfThisSubtask();
for (HoodieBaseFile baseFile : latestBaseFiles) {
List<HoodieKey> hoodieKeys =
ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new
Path(baseFile.getPath()));
hoodieKeys.forEach(hoodieKey -> {
try {
- this.indexState.put(hoodieKey, new
HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+ // Reference: org.apache.flink.streaming.api.datastream.KeyedStream,
+ // the input records is shuffled by record key
+ boolean shouldLoad =
KeyGroupRangeAssignment.assignKeyToParallelOperator(
+ hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID;
+ if (shouldLoad) {
+ this.indexState.put(hoodieKey, new
HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
+ }
} catch (Exception e) {
throw new HoodieIOException("Error when load record keys from file:
" + baseFile);
}
@@ -256,49 +232,9 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
partitionLoadState.put(partitionPath, 0);
}
- /**
- * Loads the existing partitions for this task.
- */
- private void loadInitialPartitions() {
- List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
- this.conf.getString(FlinkOptions.PATH), false, false, false);
- final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
- final int maxParallelism =
getRuntimeContext().getMaxNumberOfParallelSubtasks();
- final int taskID = getRuntimeContext().getIndexOfThisSubtask();
- // reference: org.apache.flink.streaming.api.datastream.KeyedStream
- this.initialPartitionsToLoad = allPartitionPaths.stream()
- .filter(partition ->
KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism,
parallelism) == taskID)
- .collect(Collectors.toSet());
- }
-
- /**
- * Checks whether all the partitions of the table are loaded into the state,
- * set the flag {@code allPartitionsLoaded} to true if it is.
- */
- private void checkPartitionsLoaded() {
- for (String partition : this.initialPartitionsToLoad) {
- try {
- if (!this.partitionLoadState.contains(partition)) {
- return;
- }
- } catch (Exception e) {
- LOG.warn("Error when check whether all partitions are loaded,
ignored", e);
- throw new HoodieException(e);
- }
- }
- this.allPartitionsLoaded = true;
- }
-
- @VisibleForTesting
- public boolean isAllPartitionsLoaded() {
- return this.allPartitionsLoaded;
- }
-
@VisibleForTesting
public void clearIndexState() {
- this.allPartitionsLoaded = false;
this.indexState.clear();
- loadInitialPartitions();
}
@VisibleForTesting
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
index 58bbe9c..d89ad83 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
@@ -61,6 +61,16 @@ public class BucketAssigner {
private static final Logger LOG = LogManager.getLogger(BucketAssigner.class);
/**
+ * Task ID.
+ */
+ private final int taskID;
+
+ /**
+ * Number of tasks.
+ */
+ private final int numTasks;
+
+ /**
* Remembers what type each bucket is for later.
*/
private final HashMap<String, BucketInfo> bucketInfoMap;
@@ -104,12 +114,16 @@ public class BucketAssigner {
private final Map<String, NewFileAssignState> newFileAssignStates;
public BucketAssigner(
+ int taskID,
+ int numTasks,
HoodieFlinkEngineContext context,
HoodieWriteConfig config) {
bucketInfoMap = new HashMap<>();
partitionSmallFilesMap = new HashMap<>();
smallFileAssignStates = new HashMap<>();
newFileAssignStates = new HashMap<>();
+ this.taskID = taskID;
+ this.numTasks = numTasks;
this.context = context;
this.config = config;
this.table = HoodieFlinkTable.create(this.config, this.context);
@@ -187,7 +201,7 @@ public class BucketAssigner {
if (partitionSmallFilesMap.containsKey(partitionPath)) {
return partitionSmallFilesMap.get(partitionPath);
}
- List<SmallFile> smallFiles = getSmallFiles(partitionPath);
+ List<SmallFile> smallFiles =
smallFilesOfThisTask(getSmallFiles(partitionPath));
if (smallFiles.size() > 0) {
LOG.info("For partitionPath : " + partitionPath + " Small Files => " +
smallFiles);
partitionSmallFilesMap.put(partitionPath, smallFiles);
@@ -240,6 +254,15 @@ public class BucketAssigner {
return smallFileLocations;
}
+ private List<SmallFile> smallFilesOfThisTask(List<SmallFile> smallFiles) {
+ // computes the small files to write inserts for this task.
+ List<SmallFile> smallFilesOfThisTask = new ArrayList<>();
+ for (int i = taskID; i < smallFiles.size(); i += numTasks) {
+ smallFilesOfThisTask.add(smallFiles.get(i));
+ }
+ return smallFilesOfThisTask;
+ }
+
/**
* Obtains the average record size based on records written during previous
commits. Used for estimating how many
* records pack into one file.
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
index f5703f1..237ec27 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java
@@ -33,20 +33,24 @@ public abstract class BucketAssigners {
/**
* Creates a {@code BucketAssigner}.
*
+ * @param taskID The task ID
+ * @param numTasks The number of tasks
* @param tableType The table type
* @param context The engine context
* @param config The configuration
* @return the bucket assigner instance
*/
public static BucketAssigner create(
+ int taskID,
+ int numTasks,
HoodieTableType tableType,
HoodieFlinkEngineContext context,
HoodieWriteConfig config) {
switch (tableType) {
case COPY_ON_WRITE:
- return new BucketAssigner(context, config);
+ return new BucketAssigner(taskID, numTasks, context, config);
case MERGE_ON_READ:
- return new DeltaBucketAssigner(context, config);
+ return new DeltaBucketAssigner(taskID, numTasks, context, config);
default:
throw new AssertionError();
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java
index 895f593..deb8250 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java
@@ -40,8 +40,12 @@ import java.util.stream.Collectors;
* <p>Note: assumes the index can always index log files for Flink write.
*/
public class DeltaBucketAssigner extends BucketAssigner {
- public DeltaBucketAssigner(HoodieFlinkEngineContext context,
HoodieWriteConfig config) {
- super(context, config);
+ public DeltaBucketAssigner(
+ int taskID,
+ int numTasks,
+ HoodieFlinkEngineContext context,
+ HoodieWriteConfig config) {
+ super(taskID, numTasks, context, config);
}
@Override
@@ -77,11 +81,13 @@ public class DeltaBucketAssigner extends BucketAssigner {
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
} else {
- HoodieLogFile logFile =
smallFileSlice.getLogFiles().findFirst().get();
- sf.location = new
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
- FSUtils.getFileIdFromLogPath(logFile.getPath()));
- sf.sizeBytes = getTotalFileSize(smallFileSlice);
- smallFileLocations.add(sf);
+ smallFileSlice.getLogFiles().findFirst().ifPresent(logFile -> {
+ // in case there is something error, and the file slice has no log
file
+ sf.location = new
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+ FSUtils.getFileIdFromLogPath(logFile.getPath()));
+ sf.sizeBytes = getTotalFileSize(smallFileSlice);
+ smallFileLocations.add(sf);
+ });
}
}
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
index 1d41003..5bd3c68 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/transform/RowDataToHoodieFunction.java
@@ -23,6 +23,9 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.util.RowDataToAvroConverters;
@@ -36,7 +39,11 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
+import javax.annotation.Nullable;
+
import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
/**
* Function that transforms RowData to HoodieRecord.
@@ -64,6 +71,11 @@ public class RowDataToHoodieFunction<I extends RowData, O
extends HoodieRecord<?
private transient KeyGenerator keyGenerator;
/**
+ * Utilities to create hoodie pay load instance.
+ */
+ private transient PayloadCreation payloadCreation;
+
+ /**
* Config options.
*/
private final Configuration config;
@@ -79,6 +91,7 @@ public class RowDataToHoodieFunction<I extends RowData, O
extends HoodieRecord<?
this.avroSchema = StreamerUtil.getSourceSchema(this.config);
this.converter = RowDataToAvroConverters.createConverter(this.rowType);
this.keyGenerator =
StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
+ this.payloadCreation = PayloadCreation.instance(config);
}
@SuppressWarnings("unchecked")
@@ -95,19 +108,61 @@ public class RowDataToHoodieFunction<I extends RowData, O
extends HoodieRecord<?
* @throws IOException if error occurs
*/
@SuppressWarnings("rawtypes")
- private HoodieRecord toHoodieRecord(I record) throws IOException {
- boolean shouldCombine =
this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
- ||
WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) ==
WriteOperationType.UPSERT;
+ private HoodieRecord toHoodieRecord(I record) throws Exception {
GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema,
record);
- final String payloadClazz =
this.config.getString(FlinkOptions.PAYLOAD_CLASS);
- Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
- this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
final HoodieKey hoodieKey = keyGenerator.getKey(gr);
// nullify the payload insert data to mark the record as a DELETE
- gr = record.getRowKind() == RowKind.DELETE ? null : gr;
- HoodieRecordPayload payload = shouldCombine
- ? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
- : StreamerUtil.createPayload(payloadClazz, gr);
+ final boolean isDelete = record.getRowKind() == RowKind.DELETE;
+ HoodieRecordPayload payload = payloadCreation.createPayload(gr, isDelete);
return new HoodieRecord<>(hoodieKey, payload);
}
+
+ /**
+ * Util to create hoodie pay load instance.
+ */
+ private static class PayloadCreation implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final boolean shouldCombine;
+ private final Constructor<?> constructor;
+ private final String preCombineField;
+
+ private PayloadCreation(
+ boolean shouldCombine,
+ Constructor<?> constructor,
+ @Nullable String preCombineField) {
+ this.shouldCombine = shouldCombine;
+ this.constructor = constructor;
+ this.preCombineField = preCombineField;
+ }
+
+ public static PayloadCreation instance(Configuration conf) throws
Exception {
+ boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
+ ||
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) ==
WriteOperationType.UPSERT;
+ String preCombineField = null;
+ final Class<?>[] argTypes;
+ final Constructor<?> constructor;
+ if (shouldCombine) {
+ preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD);
+ argTypes = new Class<?>[] {GenericRecord.class, Comparable.class};
+ } else {
+ argTypes = new Class<?>[] {Option.class};
+ }
+ final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS);
+ constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes);
+ return new PayloadCreation(shouldCombine, constructor, preCombineField);
+ }
+
+ public HoodieRecordPayload<?> createPayload(GenericRecord record, boolean
isDelete) throws Exception {
+ if (shouldCombine) {
+ ValidationUtils.checkState(preCombineField != null);
+ Comparable<?> orderingVal = (Comparable<?>)
HoodieAvroUtils.getNestedFieldVal(record,
+ preCombineField, false);
+ return (HoodieRecordPayload<?>) constructor.newInstance(
+ isDelete ? null : record, orderingVal);
+ } else {
+ return (HoodieRecordPayload<?>)
this.constructor.newInstance(Option.of(record));
+ }
+ }
+ }
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java
new file mode 100644
index 0000000..b595767
--- /dev/null
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CoordinatorExecutor.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.slf4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Coordinator executor that executes the tasks asynchronously, it fails the
job
+ * for any task exceptions.
+ *
+ * <p>We need this because the coordinator methods are called by
+ * the Job Manager's main thread (mailbox thread), executes the methods
asynchronously
+ * to avoid blocking the main thread.
+ */
+public class CoordinatorExecutor extends NonThrownExecutor {
+ private final OperatorCoordinator.Context context;
+
+ public CoordinatorExecutor(OperatorCoordinator.Context context, Logger
logger) {
+ super(logger);
+ this.context = context;
+ }
+
+ @Override
+ protected void exceptionHook(String actionString, Throwable t) {
+ this.context.failJob(new HoodieException(actionString, t));
+ }
+
+ @Override
+ public void close() throws Exception {
+ // wait for the remaining tasks to finish.
+ executor.shutdown();
+ // We do not expect this to actually block for long. At this point, there
should
+ // be very few task running in the executor, if any.
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+ }
+}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
index 1d0542e..87c9c01 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java
@@ -21,7 +21,6 @@ package org.apache.hudi.sink.utils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -31,15 +30,16 @@ import java.util.concurrent.TimeUnit;
* An executor service that catches all the throwable with logging.
*/
public class NonThrownExecutor implements AutoCloseable {
- private static final Logger LOG =
LoggerFactory.getLogger(NonThrownExecutor.class);
+ private final Logger logger;
/**
* A single-thread executor to handle all the asynchronous jobs.
*/
- private final ExecutorService executor;
+ protected final ExecutorService executor;
- public NonThrownExecutor() {
+ public NonThrownExecutor(Logger logger) {
this.executor = Executors.newSingleThreadExecutor();
+ this.logger = logger;
}
/**
@@ -48,24 +48,30 @@ public class NonThrownExecutor implements AutoCloseable {
public void execute(
final ThrowingRunnable<Throwable> action,
final String actionName,
- final String instant) {
+ final Object... actionParams) {
executor.execute(
() -> {
+ final String actionString = String.format(actionName, actionParams);
try {
action.run();
- LOG.info("Executor executes action [{}] for instant [{}]
success!", actionName, instant);
+ logger.info("Executor executes action [{}] success!",
actionString);
} catch (Throwable t) {
// if we have a JVM critical error, promote it immediately, there
is a good
// chance the
// logging or job failing will not succeed any more
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
- final String errMsg = String.format("Executor executes action [%s]
error", actionName);
- LOG.error(errMsg, t);
+ final String errMsg = String.format("Executor executes action [%s]
error", actionString);
+ logger.error(errMsg, t);
+ exceptionHook(errMsg, t);
}
});
}
+ protected void exceptionHook(String errMsg, Throwable t) {
+ // for sub-class to override.
+ }
+
@Override
public void close() throws Exception {
if (executor != null) {
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 28568f7..a568a3f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -69,8 +69,8 @@ public class HoodieTableSink implements DynamicTableSink,
SupportsPartitioning {
DataStream<Object> pipeline = dataStream
.map(new RowDataToHoodieFunction<>(rowType, conf),
TypeInformation.of(HoodieRecord.class))
- // Key-by partition path, to avoid multiple subtasks write to a
partition at the same time
- .keyBy(HoodieRecord::getPartitionPath)
+ // Key-by record key, to avoid multiple subtasks write to a bucket
at the same time
+ .keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 900ec41..8fed30b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
@@ -179,19 +180,6 @@ public class StreamerUtil {
}
}
- /**
- * Create a payload class via reflection, do not ordering/precombine value.
- */
- public static HoodieRecordPayload createPayload(String payloadClass,
GenericRecord record)
- throws IOException {
- try {
- return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
- new Class<?>[] {Option.class}, Option.of(record));
- } catch (Throwable e) {
- throw new IOException("Could not create payload for class: " +
payloadClass, e);
- }
- }
-
public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig
conf) {
return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf));
}
@@ -215,6 +203,12 @@ public class StreamerUtil {
// actually Flink cleaning is always with parallelism 1 now
.withCleanerParallelism(20)
.build())
+ .withMemoryConfig(
+ HoodieMemoryConfig.newBuilder()
+ .withMaxMemoryMaxSize(
+ conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) *
1024 * 1024L,
+ conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) *
1024 * 1024L
+ ).build())
.forTable(conf.getString(FlinkOptions.TABLE_NAME))
.withAutoCommit(false)
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
index 94e0975..361fcef 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
@@ -120,8 +120,8 @@ public class StreamWriteITCase extends TestLogger {
.map(record ->
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4)
.map(new RowDataToHoodieFunction<>(rowType, conf),
TypeInformation.of(HoodieRecord.class))
- // Key-by partition path, to avoid multiple subtasks write to a
partition at the same time
- .keyBy(HoodieRecord::getPartitionPath)
+ // Key-by record key, to avoid multiple subtasks write to a bucket at
the same time
+ .keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
@@ -179,8 +179,8 @@ public class StreamWriteITCase extends TestLogger {
.name("instant_generator")
.uid("instant_generator_id")
- // Keyby partition path, to avoid multiple subtasks writing to a
partition at the same time
- .keyBy(HoodieRecord::getPartitionPath)
+ // Key-by record key, to avoid multiple subtasks write to a bucket at
the same time
+ .keyBy(HoodieRecord::getRecordKey)
// use the bucket assigner to generate bucket IDs
.transform(
"bucket_assigner",
@@ -249,8 +249,8 @@ public class StreamWriteITCase extends TestLogger {
.map(record ->
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(4)
.map(new RowDataToHoodieFunction<>(rowType, conf),
TypeInformation.of(HoodieRecord.class))
- // Key-by partition path, to avoid multiple subtasks write to a
partition at the same time
- .keyBy(HoodieRecord::getPartitionPath)
+ // Key-by record key, to avoid multiple subtasks write to a bucket at
the same time
+ .keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 321abfa..0afd414 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -23,12 +23,15 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
+import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -42,11 +45,12 @@ import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -60,9 +64,11 @@ public class TestStreamWriteOperatorCoordinator {
@BeforeEach
public void before() throws Exception {
+ OperatorCoordinator.Context context = new
MockOperatorCoordinatorContext(new OperatorID(), 2);
coordinator = new StreamWriteOperatorCoordinator(
- TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2);
+ TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()),
context);
coordinator.start();
+ coordinator.setExecutor(new MockCoordinatorExecutor(context));
}
@AfterEach
@@ -99,8 +105,8 @@ public class TestStreamWriteOperatorCoordinator {
coordinator.notifyCheckpointComplete(1);
String inflight = coordinator.getWriteClient()
- .getInflightAndRequestedInstant("COPY_ON_WRITE");
- String lastCompleted =
coordinator.getWriteClient().getLastCompletedInstant("COPY_ON_WRITE");
+ .getInflightAndRequestedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+ String lastCompleted =
coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
assertThat("Instant should be complete", lastCompleted, is(instant));
assertNotEquals("", inflight, "Should start a new instant");
assertNotEquals(instant, inflight, "Should start a new instant");
@@ -131,27 +137,43 @@ public class TestStreamWriteOperatorCoordinator {
.instantTime("abc")
.writeStatus(Collections.emptyList())
.build();
- assertThrows(IllegalStateException.class,
- () -> coordinator.handleEventFromOperator(0, event),
+
+ assertError(() -> coordinator.handleEventFromOperator(0, event),
"Receive an unexpected event for instant abc from task 0");
}
@Test
- public void testCheckpointCompleteWithException() {
+ public void testCheckpointCompleteWithPartialEvents() {
final CompletableFuture<byte[]> future = new CompletableFuture<>();
coordinator.checkpointCoordinator(1, future);
- String inflightInstant = coordinator.getInstant();
+ String instant = coordinator.getInstant();
OperatorEvent event = BatchWriteSuccessEvent.builder()
.taskID(0)
- .instantTime(inflightInstant)
+ .instantTime(instant)
.writeStatus(Collections.emptyList())
.build();
coordinator.handleEventFromOperator(0, event);
- assertThrows(HoodieException.class,
- () -> coordinator.notifyCheckpointComplete(1),
- "org.apache.hudi.exception.HoodieException: Instant [20210330153432]
has a complete checkpoint [1],\n"
- + "but the coordinator has not received full write success
events,\n"
- + "rolls back the instant and rethrow");
+
+ assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1),
+ "Returns early for empty write results");
+ String lastCompleted =
coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+ assertNull(lastCompleted, "Returns early for empty write results");
+ assertNull(coordinator.getEventBuffer()[0]);
+
+ WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
+ writeStatus1.setPartitionPath("par2");
+ writeStatus1.setStat(new HoodieWriteStat());
+ OperatorEvent event1 = BatchWriteSuccessEvent.builder()
+ .taskID(1)
+ .instantTime(instant)
+ .writeStatus(Collections.singletonList(writeStatus1))
+ .isLastBatch(true)
+ .build();
+ coordinator.handleEventFromOperator(1, event1);
+ assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2),
+ "Commits the instant with partial events anyway");
+ lastCompleted =
coordinator.getWriteClient().getLastCompletedInstant(FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+ assertThat("Commits the instant with partial events anyway",
lastCompleted, is(instant));
}
@Test
@@ -159,8 +181,10 @@ public class TestStreamWriteOperatorCoordinator {
// override the default configuration
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.HIVE_SYNC_ENABLED, true);
- coordinator = new StreamWriteOperatorCoordinator(conf, 1);
+ OperatorCoordinator.Context context = new
MockOperatorCoordinatorContext(new OperatorID(), 1);
+ coordinator = new StreamWriteOperatorCoordinator(conf, context);
coordinator.start();
+ coordinator.setExecutor(new MockCoordinatorExecutor(context));
String instant = coordinator.getInstant();
assertNotEquals("", instant);
@@ -180,4 +204,16 @@ public class TestStreamWriteOperatorCoordinator {
// never throw for hive synchronization now
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1));
}
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private void assertError(Runnable runnable, String message) {
+ runnable.run();
+ // wait a little while for the task to finish
+ assertThat(coordinator.getContext(),
instanceOf(MockOperatorCoordinatorContext.class));
+ MockOperatorCoordinatorContext context = (MockOperatorCoordinatorContext)
coordinator.getContext();
+ assertTrue(context.isJobFailed(), message);
+ }
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 0e53cfa..2384f7e 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.hudi.utils.TestConfigurations;
@@ -53,11 +52,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
- * Test cases for StreamingSinkFunction.
+ * Test cases for stream write.
*/
public class TestWriteCopyOnWrite {
@@ -193,19 +191,19 @@ public class TestWriteCopyOnWrite {
assertThat(writeStatuses.size(), is(0)); // no data write
// fails the checkpoint
- assertThrows(HoodieException.class,
- () -> funcWrapper.checkpointFails(1),
- "The last checkpoint was aborted, roll back the last write and throw");
+ funcWrapper.checkpointFails(1);
+ assertFalse(funcWrapper.getCoordinatorContext().isJobFailed(),
+ "The last checkpoint was aborted, ignore the events");
- // the instant metadata should be cleared
- checkInstantState(funcWrapper.getWriteClient(),
HoodieInstant.State.REQUESTED, null);
+ // the instant metadata should be reused
+ checkInstantState(funcWrapper.getWriteClient(),
HoodieInstant.State.REQUESTED, instant);
checkInstantState(funcWrapper.getWriteClient(),
HoodieInstant.State.COMPLETED, null);
for (RowData rowData : TestData.DATA_SET_INSERT) {
funcWrapper.invoke(rowData);
}
- // this returns early cause there is no inflight instant
+ // this returns early because there is no inflight instant
funcWrapper.checkpointFunction(2);
// do not sent the write event and fails the checkpoint,
// behaves like the last checkpoint is successful.
@@ -501,16 +499,11 @@ public class TestWriteCopyOnWrite {
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the
event");
checkInstantState(funcWrapper.getWriteClient(),
HoodieInstant.State.REQUESTED, instant);
- assertFalse(funcWrapper.isAllPartitionsLoaded(),
- "All partitions assume to be loaded into the index state");
+
funcWrapper.checkpointComplete(2);
// the coordinator checkpoint commits the inflight instant.
checkInstantState(funcWrapper.getWriteClient(),
HoodieInstant.State.COMPLETED, instant);
checkWrittenData(tempFile, EXPECTED2);
- // next element triggers all partitions load check
- funcWrapper.invoke(TestData.DATA_SET_INSERT.get(0));
- assertTrue(funcWrapper.isAllPartitionsLoaded(),
- "All partitions assume to be loaded into the index state");
}
// -------------------------------------------------------------------------
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
index 04cee44..33457b5 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
@@ -146,6 +146,55 @@ public class TestBucketAssigner {
assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
}
+ /**
+ * Test that only partial small files are assigned to the task.
+ */
+ @Test
+ public void testInsertWithPartialSmallFiles() {
+ SmallFile f0 = new SmallFile();
+ f0.location = new HoodieRecordLocation("t0", "f0");
+ f0.sizeBytes = 12;
+
+ SmallFile f1 = new SmallFile();
+ f1.location = new HoodieRecordLocation("t0", "f1");
+ f1.sizeBytes = 122879; // no left space to append new records to this
bucket
+
+ SmallFile f2 = new SmallFile();
+ f2.location = new HoodieRecordLocation("t0", "f2");
+ f2.sizeBytes = 56;
+
+ Map<String, List<SmallFile>> smallFilesMap = new HashMap<>();
+ smallFilesMap.put("par1", Arrays.asList(f0, f1, f2));
+
+ MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(0, 2,
context, writeConfig, smallFilesMap);
+ BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1");
+ assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+ mockBucketAssigner.addInsert("par1");
+ bucketInfo = mockBucketAssigner.addInsert("par1");
+ assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+ bucketInfo = mockBucketAssigner.addInsert("par3");
+ assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
+
+ bucketInfo = mockBucketAssigner.addInsert("par3");
+ assertBucketEquals(bucketInfo, "par3", BucketType.INSERT);
+
+ MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2,
context, writeConfig, smallFilesMap);
+ BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1");
+ assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1");
+
+ mockBucketAssigner2.addInsert("par1");
+ bucketInfo2 = mockBucketAssigner2.addInsert("par1");
+ assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1");
+
+ bucketInfo2 = mockBucketAssigner2.addInsert("par3");
+ assertBucketEquals(bucketInfo2, "par3", BucketType.INSERT);
+
+ bucketInfo2 = mockBucketAssigner2.addInsert("par3");
+ assertBucketEquals(bucketInfo2, "par3", BucketType.INSERT);
+ }
+
@Test
public void testUpdateAndInsertWithSmallFiles() {
SmallFile f0 = new SmallFile();
@@ -187,6 +236,60 @@ public class TestBucketAssigner {
assertBucketEquals(bucketInfo, "par2", BucketType.UPDATE, "f2");
}
+ /**
+ * Test that only partial small files are assigned to the task.
+ */
+ @Test
+ public void testUpdateAndInsertWithPartialSmallFiles() {
+ SmallFile f0 = new SmallFile();
+ f0.location = new HoodieRecordLocation("t0", "f0");
+ f0.sizeBytes = 12;
+
+ SmallFile f1 = new SmallFile();
+ f1.location = new HoodieRecordLocation("t0", "f1");
+ f1.sizeBytes = 122879; // no left space to append new records to this
bucket
+
+ SmallFile f2 = new SmallFile();
+ f2.location = new HoodieRecordLocation("t0", "f2");
+ f2.sizeBytes = 56;
+
+ Map<String, List<SmallFile>> smallFilesMap = new HashMap<>();
+ smallFilesMap.put("par1", Arrays.asList(f0, f1, f2));
+
+ MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(0, 2,
context, writeConfig, smallFilesMap);
+ mockBucketAssigner.addUpdate("par1", "f0");
+
+ BucketInfo bucketInfo = mockBucketAssigner.addInsert("par1");
+ assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+ mockBucketAssigner.addInsert("par1");
+ bucketInfo = mockBucketAssigner.addInsert("par1");
+ assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+ mockBucketAssigner.addUpdate("par1", "f2");
+
+ mockBucketAssigner.addInsert("par1");
+ bucketInfo = mockBucketAssigner.addInsert("par1");
+ assertBucketEquals(bucketInfo, "par1", BucketType.UPDATE, "f0");
+
+
+ MockBucketAssigner mockBucketAssigner2 = new MockBucketAssigner(1, 2,
context, writeConfig, smallFilesMap);
+ mockBucketAssigner2.addUpdate("par1", "f0");
+
+ BucketInfo bucketInfo2 = mockBucketAssigner2.addInsert("par1");
+ assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1");
+
+ mockBucketAssigner2.addInsert("par1");
+ bucketInfo2 = mockBucketAssigner2.addInsert("par1");
+ assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1");
+
+ mockBucketAssigner2.addUpdate("par1", "f2");
+
+ mockBucketAssigner2.addInsert("par1");
+ bucketInfo2 = mockBucketAssigner2.addInsert("par1");
+ assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1");
+ }
+
private void assertBucketEquals(
BucketInfo bucketInfo,
String partition,
@@ -220,7 +323,16 @@ public class TestBucketAssigner {
HoodieFlinkEngineContext context,
HoodieWriteConfig config,
Map<String, List<SmallFile>> smallFilesMap) {
- super(context, config);
+ this(0, 1, context, config, smallFilesMap);
+ }
+
+ MockBucketAssigner(
+ int taskID,
+ int numTasks,
+ HoodieFlinkEngineContext context,
+ HoodieWriteConfig config,
+ Map<String, List<SmallFile>> smallFilesMap) {
+ super(taskID, numTasks, context, config);
this.smallFilesMap = smallFilesMap;
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
index e8796ec..fe3dd81 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
@@ -28,7 +28,9 @@ import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.memory.MemoryManager;
+import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.operators.Output;
@@ -78,6 +80,9 @@ public class CompactFunctionWrapper {
compactFunction = new CompactFunction(conf);
compactFunction.setRuntimeContext(runtimeContext);
compactFunction.open(conf);
+ final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor(
+ new MockOperatorCoordinatorContext(new OperatorID(), 1));
+ compactFunction.setExecutor(syncExecutor);
commitSink = new CompactionCommitSink(conf);
commitSink.setRuntimeContext(runtimeContext);
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java
new file mode 100644
index 0000000..099dfd6
--- /dev/null
+++
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCoordinatorExecutor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A mock {@link CoordinatorExecutor} that executes the actions synchronously.
+ */
+public class MockCoordinatorExecutor extends CoordinatorExecutor {
+ private static final Logger LOG =
LoggerFactory.getLogger(MockCoordinatorExecutor.class);
+
+ public MockCoordinatorExecutor(OperatorCoordinator.Context context) {
+ super(context, LOG);
+ }
+
+ @Override
+ public void execute(ThrowingRunnable<Throwable> action, String actionName,
Object... actionParams) {
+ final String actionString = String.format(actionName, actionParams);
+ try {
+ action.run();
+ LOG.info("Executor executes action [{}] success!", actionString);
+ } catch (Throwable t) {
+ // if we have a JVM critical error, promote it immediately, there is a
good
+ // chance the
+ // logging or job failing will not succeed any more
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+ exceptionHook(actionString, t);
+ }
+ }
+}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 72f2e89..a4b6c16 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -33,7 +33,9 @@ import org.apache.hudi.utils.TestConfigurations;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.memory.MemoryManager;
+import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
@@ -57,6 +59,7 @@ public class StreamWriteFunctionWrapper<I> {
private final IOManager ioManager;
private final StreamingRuntimeContext runtimeContext;
private final MockOperatorEventGateway gateway;
+ private final MockOperatorCoordinatorContext coordinatorContext;
private final StreamWriteOperatorCoordinator coordinator;
private final MockFunctionInitializationContext
functionInitializationContext;
@@ -84,13 +87,15 @@ public class StreamWriteFunctionWrapper<I> {
this.gateway = new MockOperatorEventGateway();
this.conf = conf;
// one function
- this.coordinator = new StreamWriteOperatorCoordinator(conf, 1);
+ this.coordinatorContext = new MockOperatorCoordinatorContext(new
OperatorID(), 1);
+ this.coordinator = new StreamWriteOperatorCoordinator(conf,
this.coordinatorContext);
this.functionInitializationContext = new
MockFunctionInitializationContext();
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
}
public void openFunction() throws Exception {
this.coordinator.start();
+ this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
toHoodieFunction = new
RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
toHoodieFunction.setRuntimeContext(runtimeContext);
toHoodieFunction.open(conf);
@@ -181,6 +186,10 @@ public class StreamWriteFunctionWrapper<I> {
return coordinator;
}
+ public MockOperatorCoordinatorContext getCoordinatorContext() {
+ return coordinatorContext;
+ }
+
public void clearIndexState() {
this.bucketAssignerFunction.clearIndexState();
}
@@ -188,8 +197,4 @@ public class StreamWriteFunctionWrapper<I> {
public boolean isKeyInState(HoodieKey hoodieKey) {
return this.bucketAssignerFunction.isKeyInState(hoodieKey);
}
-
- public boolean isAllPartitionsLoaded() {
- return this.bucketAssignerFunction.isAllPartitionsLoaded();
- }
}