This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8f23fc298773 feat: Support bucket assgin operator fetching inflight
instants from coordinator (#17885)
8f23fc298773 is described below
commit 8f23fc298773f3beb67edda66757ab81698ccf8f
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Jan 19 12:48:26 2026 +0800
feat: Support bucket assgin operator fetching inflight instants from
coordinator (#17885)
---
.../apache/hudi/configuration/FlinkOptions.java | 3 +-
.../apache/hudi/configuration/OptionsResolver.java | 2 +-
.../hudi/sink/StreamWriteOperatorCoordinator.java | 20 +++++-
.../org/apache/hudi/sink/event/Correspondent.java | 42 +++++++++++++
.../sink/partitioner/BucketAssignFunction.java | 11 +++-
.../partitioner/MiniBatchBucketAssignOperator.java | 25 +++++++-
.../partitioner/MinibatchBucketAssignFunction.java | 11 +++-
.../hudi/sink/partitioner/index/IndexBackend.java | 7 ++-
.../sink/partitioner/index/RecordIndexCache.java | 10 ++-
.../partitioner/index/RecordLevelIndexBackend.java | 6 +-
.../org/apache/hudi/sink/utils/EventBuffers.java | 7 +++
.../hudi/sink/utils/OperatorIDGenerator.java | 49 +++++++++++++++
.../java/org/apache/hudi/sink/utils/Pipelines.java | 12 ++--
.../sink/TestStreamWriteOperatorCoordinator.java | 41 ++++++++++++
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 19 ++++++
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 5 ++
.../hudi/sink/TestWriteMergeOnReadWithCompact.java | 5 ++
.../TestMinibatchBucketAssignFunction.java | 72 ++++++++++++++--------
.../index/TestRecordLevelIndexBackend.java | 10 ++-
.../apache/hudi/sink/utils/MockCorrespondent.java | 12 ++++
.../sink/utils/StreamWriteFunctionWrapper.java | 11 +++-
.../hudi/sink/utils/TestFunctionWrapper.java | 8 +++
.../org/apache/hudi/sink/utils/TestWriteBase.java | 13 ++++
.../apache/hudi/table/ITTestHoodieDataSource.java | 9 ++-
24 files changed, 357 insertions(+), 53 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 195db61e5d81..773c8721900f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -290,7 +290,8 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("The maximum number of input records can be buffered
for miniBatch during record index lookup.\n"
+ "MiniBatch is an optimization to buffer input records to reduce
the number of individual index lookups,\n"
+ "which can significantly improve performance compared to
processing each record individually.\n"
- + "Set to 0 to disable mini-batch processing.");
+ + "Default value is 1000, which is also the minimum value for the
minibatch size, when the configured size\n"
+ + "is less than 1000, the default value will be used.");
// ------------------------------------------------------------------------
// Read Options
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index ff9b519b60d3..6f083daac07a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -187,7 +187,7 @@ public class OptionsResolver {
*/
public static boolean isMiniBatchBucketAssign(Configuration conf) {
HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
- return indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX &&
conf.get(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE) > 0;
+ return indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX;
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 2e8b9f8a97df..ced38c436cc8 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -171,7 +171,7 @@ public class StreamWriteOperatorCoordinator
protected NonThrownExecutor executor;
/**
- * A single-thread executor to handle the instant time request.
+ * A single-thread executor to handle the coordination request from
operators.
*/
protected NonThrownExecutor instantRequestExecutor;
@@ -372,10 +372,19 @@ public class StreamWriteOperatorCoordinator
@Override
public CompletableFuture<CoordinationResponse>
handleCoordinationRequest(CoordinationRequest request) {
+ if (request instanceof Correspondent.InstantTimeRequest) {
+ return handleInstantRequest((Correspondent.InstantTimeRequest) request);
+ }
+ if (request instanceof Correspondent.InflightInstantsRequest) {
+ return
handleInFlightInstantsRequest((Correspondent.InflightInstantsRequest) request);
+ }
+ throw new HoodieException("Unexpected coordination request type: " +
request.getClass().getSimpleName());
+ }
+
+ private CompletableFuture<CoordinationResponse>
handleInstantRequest(Correspondent.InstantTimeRequest request) {
CompletableFuture<CoordinationResponse> response = new
CompletableFuture<>();
instantRequestExecutor.execute(() -> {
- Correspondent.InstantTimeRequest instantTimeRequest =
(Correspondent.InstantTimeRequest) request;
- long checkpointId = instantTimeRequest.getCheckpointId();
+ long checkpointId = request.getCheckpointId();
Pair<String, WriteMetadataEvent[]> instantTimeAndEventBuffer =
this.eventBuffers.getInstantAndEventBuffer(checkpointId);
final String instantTime;
if (instantTimeAndEventBuffer == null) {
@@ -391,6 +400,11 @@ public class StreamWriteOperatorCoordinator
return response;
}
+ private CompletableFuture<CoordinationResponse>
handleInFlightInstantsRequest(Correspondent.InflightInstantsRequest request) {
+ CoordinationResponse coordinationResponse =
Correspondent.InflightInstantsResponse.getInstance(eventBuffers.getAllCheckpointIdAndInstants());
+ return
CompletableFuture.completedFuture(CoordinationResponseSerDe.wrap(coordinationResponse));
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
index 5176c5ec3e30..ea5fd557c8ee 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
@@ -31,6 +31,9 @@ import
org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.util.SerializedValue;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Correspondent between a write task with the coordinator.
*/
@@ -72,6 +75,19 @@ public class Correspondent {
}
}
+ /**
+ * Sends a request to the coordinator to fetch the inflight instants.
+ */
+ public Map<Long, String> requestInflightInstants() {
+ try {
+ InflightInstantsResponse response =
CoordinationResponseSerDe.unwrap(this.gateway.sendRequestToCoordinator(this.operatorID,
+ new SerializedValue<>(InflightInstantsRequest.getInstance())).get());
+ return response.getInflightInstants();
+ } catch (Exception e) {
+ throw new HoodieException("Error requesting the instant time from the
coordinator", e);
+ }
+ }
+
/**
* A request for instant time with a given checkpoint id.
*/
@@ -99,4 +115,30 @@ public class Correspondent {
return new InstantTimeResponse(instant);
}
}
+
+ /**
+ * A request for the current inflight instants in the coordinator.
+ */
+ @AllArgsConstructor(access = AccessLevel.PRIVATE)
+ @Getter
+ public static class InflightInstantsRequest implements CoordinationRequest {
+
+ public static InflightInstantsRequest getInstance() {
+ return new InflightInstantsRequest();
+ }
+ }
+
+ /**
+ * A response with instant time.
+ */
+ @AllArgsConstructor(access = AccessLevel.PRIVATE)
+ @Getter
+ public static class InflightInstantsResponse implements CoordinationResponse
{
+
+ private final HashMap<Long, String> inflightInstants;
+
+ public static InflightInstantsResponse getInstance(HashMap<Long, String>
inflightInstants) {
+ return new InflightInstantsResponse(inflightInstants);
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 7a79526f9d49..e156ec7d5df2 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -30,6 +30,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.sink.partitioner.index.IndexBackend;
import org.apache.hudi.sink.partitioner.index.IndexBackendFactory;
import org.apache.hudi.table.action.commit.BucketInfo;
@@ -38,6 +39,7 @@ import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.utils.RuntimeContextUtils;
import lombok.Getter;
+import lombok.Setter;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -91,6 +93,12 @@ public class BucketAssignFunction
private final boolean isChangingRecords;
+ /**
+ * Correspondent to fetch the infight instants from coordinator.
+ */
+ @Setter
+ protected transient Correspondent correspondent;
+
/**
* If the index is global, update the index for the old partition path
* if same key record with different partition path came in.
@@ -212,8 +220,7 @@ public class BucketAssignFunction
public void notifyCheckpointComplete(long checkpointId) {
// Refresh the table state when there are new commits.
this.bucketAssigner.reload(checkpointId);
- // todo #17700: check the file based mapping between checkpoint id and
instant to get the latest successful instant.
- this.indexBackend.onCommitSuccess(checkpointId - 1);
+ this.indexBackend.onCheckpointComplete(this.correspondent);
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MiniBatchBucketAssignOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MiniBatchBucketAssignOperator.java
index e16cbb024788..1f97c10e0587 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MiniBatchBucketAssignOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MiniBatchBucketAssignOperator.java
@@ -19,9 +19,15 @@
package org.apache.hudi.sink.partitioner;
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.sink.event.Correspondent;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
/**
* An operator that performs mini-batch bucket assignment for incoming records.
@@ -36,17 +42,32 @@ import
org.apache.flink.streaming.api.operators.ProcessOperator;
*/
public class MiniBatchBucketAssignOperator extends
ProcessOperator<HoodieFlinkInternalRow, HoodieFlinkInternalRow> implements
BoundedOneInput {
- /** The underlying function that performs the actual bucket assignment
logic. */
+ /**
+ * The underlying function that performs the actual bucket assignment logic.
+ */
private final MinibatchBucketAssignFunction bucketAssignFunction;
+ /**
+ * OperatorId for the data write operator.
+ */
+ private final OperatorID dataWriteOperatorId;
+
/**
* Constructs a MiniBatchBucketAssignOperator with the specified bucket
assignment function.
*
* @param bucketAssignFunction the function responsible for performing the
bucket assignment logic
*/
- public MiniBatchBucketAssignOperator(MinibatchBucketAssignFunction
bucketAssignFunction) {
+ public MiniBatchBucketAssignOperator(MinibatchBucketAssignFunction
bucketAssignFunction, OperatorID dataWriteOperatorId) {
super(bucketAssignFunction);
this.bucketAssignFunction = bucketAssignFunction;
+ this.dataWriteOperatorId = dataWriteOperatorId;
+ }
+
+ @Override
+ public void setup(StreamTask<?, ?> containingTask, StreamConfig config,
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
+ super.setup(containingTask, config, output);
+
this.bucketAssignFunction.setCorrespondent(Correspondent.getInstance(dataWriteOperatorId,
+
getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway()));
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
index c40e5a43b101..70b09621aa4d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
@@ -21,9 +21,12 @@ package org.apache.hudi.sink.partitioner;
import org.apache.hudi.adapter.ProcessFunctionAdapter;
import org.apache.hudi.client.model.HoodieFlinkInternalRow;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.sink.partitioner.index.MinibatchIndexBackend;
+import lombok.Getter;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
@@ -60,6 +63,8 @@ public class MinibatchBucketAssignFunction
/**
* The maximum number of input records can be buffered for MiniBatch.
*/
+ @VisibleForTesting
+ @Getter
private final int miniBatchSize;
/**
@@ -75,7 +80,7 @@ public class MinibatchBucketAssignFunction
this.delegateFunction = new BucketAssignFunction(conf);
this.isChangingRecords = WriteOperationType.isChangingRecords(
WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION)));
- this.miniBatchSize =
conf.get(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE);
+ this.miniBatchSize =
Math.max(conf.get(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE),
FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE.defaultValue());
}
@Override
@@ -147,6 +152,10 @@ public class MinibatchBucketAssignFunction
processBufferedRecords(outCollector);
}
+ public void setCorrespondent(Correspondent correspondent) {
+ this.delegateFunction.setCorrespondent(correspondent);
+ }
+
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// Refresh the table state when there are new commits.
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
index e89bb4fe9ebe..74d98c46ab14 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
@@ -19,6 +19,7 @@
package org.apache.hudi.sink.partitioner.index;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.sink.event.Correspondent;
import java.io.Closeable;
import java.io.IOException;
@@ -55,11 +56,11 @@ public interface IndexBackend extends Closeable {
}
/**
- * Listener method called when the instant associated with {@code
checkpointId} is committed successfully.
+ * Listener method called when the bucket assign operator receives a notify
checkpoint complete event.
*
- * @param checkpointId checkpoint id.
+ * @param correspondent The Correspondent used to get inflight instants from
the coordinator.
*/
- default void onCommitSuccess(long checkpointId) {
+ default void onCheckpointComplete(Correspondent correspondent) {
// do nothing.
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
index 6ee3084f5460..267025ba4519 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordIndexCache.java
@@ -34,6 +34,7 @@ import org.apache.flink.configuration.Configuration;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Collections;
import java.util.Comparator;
import java.util.NavigableMap;
import java.util.TreeMap;
@@ -117,8 +118,15 @@ public class RecordIndexCache implements Closeable {
* @param checkpointId the id of checkpoint
*/
public void clean(long checkpointId) {
+ NavigableMap<Long, ExternalSpillableMap<String,
HoodieRecordGlobalLocation>> subMap;
+ if (checkpointId == Long.MAX_VALUE) {
+ // clean all the cache entries for old checkpoint ids, and only keeps
the cache for the maximum checkpoint id,
+ // which aims to clear memory while also ensuring a certain cache hit
rate
+ subMap = caches.firstEntry() == null ? Collections.emptyNavigableMap() :
caches.tailMap(caches.firstKey(), false);
+ } else {
+ subMap = caches.tailMap(checkpointId, false);
+ }
// Get all entries that are less than or equal to the given checkpointId
- NavigableMap<Long, ExternalSpillableMap<String,
HoodieRecordGlobalLocation>> subMap = caches.tailMap(checkpointId, false);
// Close all the ExternalSpillableMap instances before removing them
subMap.values().forEach(ExternalSpillableMap::close);
// Remove all the entries from the main cache
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
index bcd0dcc4b7b1..68200bf3ea13 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.util.StreamerUtil;
import lombok.Getter;
@@ -105,8 +106,9 @@ public class RecordLevelIndexBackend implements
MinibatchIndexBackend {
}
@Override
- public void onCommitSuccess(long checkpointId) {
- recordIndexCache.clean(checkpointId);
+ public void onCheckpointComplete(Correspondent correspondent) {
+ Map<Long, String> inflightInstants =
correspondent.requestInflightInstants();
+
recordIndexCache.clean(inflightInstants.keySet().stream().min(Long::compareTo).orElse(Long.MAX_VALUE));
this.metaClient.reloadActiveTimeline();
reloadMetadataTable();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
index d4eb6a393883..889326126e10 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
import java.io.Serializable;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -124,6 +125,12 @@ public class EventBuffers implements Serializable {
return this.eventBuffers.entrySet().stream();
}
+ public HashMap<Long, String> getAllCheckpointIdAndInstants() {
+ HashMap<Long, String> result = new HashMap<>(eventBuffers.size());
+ this.eventBuffers.forEach((k, v) -> result.put(k, v.getLeft()));
+ return result;
+ }
+
public void initNewEventBuffer(long checkpointId, String instantTime, int
parallelism) {
this.eventBuffers.put(checkpointId, Pair.of(instantTime, new
WriteMetadataEvent[parallelism]));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/OperatorIDGenerator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/OperatorIDGenerator.java
new file mode 100644
index 000000000000..a0ad18bdc68a
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/OperatorIDGenerator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.shaded.curator5.com.google.common.hash.Hashing;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Generating {@link OperatorID} for communication between Flink operators.
+ *
+ * <p>Operator ID generation is an internal implementation inside Flink,
happening during the
+ * stream graph generating phase. The implementation here is the same as that
of Flink to make
+ * sure that operators can reach out to each other on the cluster.
+ *
+ * @see org.apache.flink.state.api.runtime.OperatorIDGenerator
+ */
+public class OperatorIDGenerator {
+ private OperatorIDGenerator() {
+ }
+
+ /**
+ * Generate {@link OperatorID}'s from {@code uid}'s.
+ *
+ * @param uid {@code DataStream} operator uid.
+ * @return corresponding {@link OperatorID}
+ */
+ public static OperatorID fromUid(String uid) {
+ byte[] hash = Hashing.murmur3_128(0).newHasher().putString(uid,
UTF_8).hash().asBytes();
+ return new OperatorID(hash);
+ }
+}
\ No newline at end of file
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 5fbd3609abf6..ae0dce254222 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -384,7 +384,11 @@ public class Pipelines {
throw new HoodieNotSupportedException("Unknown bucket index engine
type: " + bucketIndexEngineType);
}
} else {
- DataStream<HoodieFlinkInternalRow> bucketAssignStream =
createBucketAssignStream(dataStream, conf, rowType);
+ // uuid is used to generate operator id for the write operator, then the
bucket assign operator can send
+ // operator event to the coordinator of the write operator based on the
operator id.
+ // @see org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway.
+ String writeOperatorUid = opUID("stream_write", conf);
+ DataStream<HoodieFlinkInternalRow> bucketAssignStream =
createBucketAssignStream(dataStream, conf, rowType, writeOperatorUid);
return bucketAssignStream
// shuffle by fileId(bucket id)
.keyBy(HoodieFlinkInternalRow::getFileId)
@@ -392,7 +396,7 @@ public class Pipelines {
opName("stream_write", conf),
TypeInformation.of(RowData.class),
StreamWriteOperator.getFactory(conf, rowType))
- .uid(opUID("stream_write", conf))
+ .uid(writeOperatorUid)
.setParallelism(conf.get(FlinkOptions.WRITE_TASKS));
}
}
@@ -406,7 +410,7 @@ public class Pipelines {
* @return A DataStream of HoodieFlinkInternalRow records with bucket
assignments
*/
private static DataStream<HoodieFlinkInternalRow> createBucketAssignStream(
- DataStream<HoodieFlinkInternalRow> inputStream, Configuration conf,
RowType rowType) {
+ DataStream<HoodieFlinkInternalRow> inputStream, Configuration conf,
RowType rowType, String writeOperatorUid) {
String assignerOperatorName = "bucket_assigner";
if (OptionsResolver.isMiniBatchBucketAssign(conf)) {
return inputStream
@@ -414,7 +418,7 @@ public class Pipelines {
.transform(
assignerOperatorName,
new HoodieFlinkInternalRowTypeInfo(rowType),
- new MiniBatchBucketAssignOperator(new
MinibatchBucketAssignFunction(conf)))
+ new MiniBatchBucketAssignOperator(new
MinibatchBucketAssignFunction(conf),
OperatorIDGenerator.fromUid(writeOperatorUid)))
.uid(opUID(assignerOperatorName, conf))
.setParallelism(conf.get(FlinkOptions.BUCKET_ASSIGN_TASKS));
} else {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 8a79c0ddbf11..9645e41d05d7 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -48,6 +48,7 @@ import org.apache.hudi.utils.TestUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -64,6 +65,7 @@ import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -561,6 +563,45 @@ public class TestStreamWriteOperatorCoordinator {
}
}
+ @Test
+ void testHandleInFlightInstantsRequest() throws Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+ coordinator = createCoordinator(conf, 2);
+
+ // Request an instant time to create an initial instant
+ String instant1 = requestInstantTime(1);
+ assertThat(instant1, not(is("")));
+
+ // Add some events to the buffer to simulate ongoing processing
+ OperatorEvent event1 = createOperatorEvent(0, 1, instant1, "par1", false,
false, 0.1);
+ coordinator.handleEventFromOperator(0, event1);
+
+ // Request another instant time to create a second instant
+ String instant2 = requestInstantTime(2);
+ assertThat(instant2, not(is("")));
+
+ // Add more events for the second instant
+ OperatorEvent event2 = createOperatorEvent(1, 2, instant2, "par2", false,
false, 0.2);
+ coordinator.handleEventFromOperator(1, event2);
+
+ // Call handleCoordinationRequest with InflightInstantsRequest
+ CompletableFuture<CoordinationResponse> responseFuture =
+
coordinator.handleCoordinationRequest(Correspondent.InflightInstantsRequest.getInstance());
+
+ // Unwrap and verify the response
+ Correspondent.InflightInstantsResponse response =
+ CoordinationResponseSerDe.unwrap(responseFuture.get());
+
+ // Check that the response contains the expected checkpoint IDs and
instant times
+ Map<Long, String> inflightInstants = response.getInflightInstants();
+ assertEquals(2, inflightInstants.size());
+ assertTrue(inflightInstants.containsKey(1L));
+ assertTrue(inflightInstants.containsKey(2L));
+ assertEquals(instant1, inflightInstants.get(1L));
+ assertEquals(instant2, inflightInstants.get(2L));
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index 6d89f521e995..e7e1f5b523c8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -864,4 +864,23 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
.checkWrittenData(EXPECTED1)
.end();
}
+
+ @Test
+ public void testCacheCleanOfRecordIndexBackend() throws Exception {
+ // use record level index
+ conf.set(FlinkOptions.INDEX_TYPE, GLOBAL_RECORD_LEVEL_INDEX.name());
+
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
+ preparePipeline(conf)
+ // should be initialized with 1 inflight caches
+ .assertInflightCachesOfBucketAssigner(1)
+ .consume(TestData.DATA_SET_INSERT)
+ .checkpoint(1)
+ // new cache created since now checkpoint id is updated to 1
+ .assertInflightCachesOfBucketAssigner(2)
+ .assertNextEvent(4, "par1,par2,par3,par4")
+ .checkpointComplete(1)
+ // clean the first inflight cache, left the latest inflight cache.
+ .assertInflightCachesOfBucketAssigner(1)
+ .checkWrittenData(EXPECTED1);
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index ff00ae50220d..d5056991469c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -258,6 +258,11 @@ public class TestWriteMergeOnRead extends
TestWriteCopyOnWrite {
// can be re-enabled after #17701
}
+ @Test
+ public void testCacheCleanOfRecordIndexBackend() throws Exception {
+ // can be re-enabled after #17701
+ }
+
@Override
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED1;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 63d087c5b28c..d5ec424f28c3 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -359,6 +359,11 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
// can be re-enabled after #17701
}
+ @Test
+ public void testCacheCleanOfRecordIndexBackend() throws Exception {
+ // can be re-enabled after #17701
+ }
+
@Override
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
index 06f8491165e2..6eb7fb16533f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
@@ -26,6 +26,7 @@ import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.StringData;
@@ -58,8 +59,6 @@ public class TestMinibatchBucketAssignFunction {
conf = TestConfigurations.getDefaultConf(basePath);
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
- // Set minibatch size to 3 for testing
- conf.set(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE, 3);
TestData.writeData(TestData.DATA_SET_INSERT, conf);
}
@@ -68,10 +67,19 @@ public class TestMinibatchBucketAssignFunction {
// Create the MinibatchBucketAssignFunction
MinibatchBucketAssignFunction function = new
MinibatchBucketAssignFunction(conf);
// Set up test harness
- testHarness = new OneInputStreamOperatorTestHarness<>(new
MiniBatchBucketAssignOperator(function), 1, 1, 0);
+ testHarness = new OneInputStreamOperatorTestHarness<>(new
MiniBatchBucketAssignOperator(function, new OperatorID()), 1, 1, 0);
testHarness.open();
}
+ @Test
+ public void testMinibatchSize() {
+ Configuration config = Configuration.fromMap(conf.toMap());
+ config.set(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE, 200);
+ MinibatchBucketAssignFunction function = new
MinibatchBucketAssignFunction(config);
+ // although the minibatch size is set to 200, but the final value is 1000
since the minimum allowed minibatch size is 1000.
+ assertEquals(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE.defaultValue(),
function.getMiniBatchSize());
+ }
+
@Test
public void testProcessElementWithBufferData() throws Exception {
// Create test records
@@ -79,8 +87,6 @@ public class TestMinibatchBucketAssignFunction {
insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1),
StringData.fromString("par1")));
HoodieFlinkInternalRow record2 = new HoodieFlinkInternalRow("id2", "par1",
"I",
insertRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 33, TimestampData.fromEpochMillis(2),
StringData.fromString("par1")));
- HoodieFlinkInternalRow record3 = new HoodieFlinkInternalRow("id10",
"par5", "I",
- insertRow(StringData.fromString("id10"),
StringData.fromString("Julian"), 53, TimestampData.fromEpochMillis(3),
StringData.fromString("par5")));
// Process first two records - they should be buffered
testHarness.processElement(new StreamRecord<>(record1));
@@ -90,18 +96,23 @@ public class TestMinibatchBucketAssignFunction {
List<HoodieFlinkInternalRow> output = testHarness.extractOutputValues();
assertEquals(0, output.size(), "Records should be buffered until batch
size is reached");
- // Process third record - this should trigger processing of the first two
buffered records
- testHarness.processElement(new StreamRecord<>(record3));
+ for (int i = 0; i < 1000; i++) {
+ // Process third record - this should trigger processing of the first
two buffered records
+ String recordKey = "new_key_" + i;
+ HoodieFlinkInternalRow record = new HoodieFlinkInternalRow(recordKey,
"par5", "I",
+ insertRow(StringData.fromString(recordKey),
StringData.fromString("Julian"), 53, TimestampData.fromEpochMillis(3),
StringData.fromString("par5")));
+ testHarness.processElement(new StreamRecord<>(record));
+ }
// Now we should have processed records
output = testHarness.extractOutputValues();
- assertEquals(3, output.size(), "All three records should be processed");
+ assertEquals(1000, output.size(), "All three records should be processed");
// Verify that the records have proper bucket assignments
for (HoodieFlinkInternalRow row : output) {
// Check that file ID and instant time are assigned
assertTrue(row.getFileId() != null && !row.getFileId().isEmpty(), "File
ID should be assigned");
- if (row.getRecordKey().equals("id10")) {
+ if (row.getRecordKey().startsWith("new_key")) {
assertEquals("I", row.getInstantTime(), "the record is an insert
record");
} else {
assertEquals("U", row.getInstantTime(), "the record is an update
record");
@@ -111,31 +122,40 @@ public class TestMinibatchBucketAssignFunction {
@Test
public void testProcessElementWithIndexRecords() throws Exception {
- // Create an index record (one that has isIndexRecord() returning true)
- HoodieFlinkInternalRow indexRecord = new HoodieFlinkInternalRow("id1",
"par1", "file_id1", "000");
- // For now, let's test with a regular record
- HoodieFlinkInternalRow record1 = new HoodieFlinkInternalRow("id1", "par1",
"I",
- insertRow(StringData.fromString("id1"),
StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1),
StringData.fromString("par1")));
- HoodieFlinkInternalRow record2 = new HoodieFlinkInternalRow("id2", "par1",
"I",
- insertRow(StringData.fromString("id2"),
StringData.fromString("Stephen"), 33, TimestampData.fromEpochMillis(2),
StringData.fromString("par1")));
- HoodieFlinkInternalRow record3 = new HoodieFlinkInternalRow("id3", "par2",
"I",
- insertRow(StringData.fromString("id3"),
StringData.fromString("Julian"), 53, TimestampData.fromEpochMillis(3),
StringData.fromString("par2")));
-
- testHarness.processElement(new StreamRecord<>(indexRecord));
+ // insert 500 index records
+ for (int i = 0; i < 500; i++) {
+ // Process third record - this should trigger processing of the first
two buffered records
+ String recordKey = "index_new_key_" + i;
+ // Create an index record (one that has isIndexRecord() returning true)
+ HoodieFlinkInternalRow indexRecord = new
HoodieFlinkInternalRow(recordKey, "par1", "file_id1", "000");
+ testHarness.processElement(new StreamRecord<>(indexRecord));
+ }
- testHarness.processElement(new StreamRecord<>(record1));
- testHarness.processElement(new StreamRecord<>(record2));
+ // insert 500 regular records
+ for (int i = 0; i < 500; i++) {
+ // Process third record - this should trigger processing of the first
two buffered records
+ String recordKey = "new_key_" + i;
+ HoodieFlinkInternalRow record = new HoodieFlinkInternalRow(recordKey,
"par5", "I",
+ insertRow(StringData.fromString(recordKey),
StringData.fromString("Julian"), 53, TimestampData.fromEpochMillis(3),
StringData.fromString("par5")));
+ testHarness.processElement(new StreamRecord<>(record));
+ }
// index records will not be buffered, so buffer will not be flushing
List<HoodieFlinkInternalRow> output = testHarness.extractOutputValues();
assertEquals(0, output.size(), "Record should be buffered since batch size
is 2");
-
- // Add another record to trigger buffer processing
- testHarness.processElement(new StreamRecord<>(record3));
+
+ // insert another 500 regular records
+ for (int i = 0; i < 500; i++) {
+ // Process third record - this should trigger processing of the first
two buffered records
+ String recordKey = "new_key_" + i;
+ HoodieFlinkInternalRow record = new HoodieFlinkInternalRow(recordKey,
"par5", "I",
+ insertRow(StringData.fromString(recordKey),
StringData.fromString("Julian"), 53, TimestampData.fromEpochMillis(3),
StringData.fromString("par5")));
+ testHarness.processElement(new StreamRecord<>(record));
+ }
// the expected size is 3, without index record
output = testHarness.extractOutputValues();
- assertEquals(3, output.size(), "Both records should be processed");
+ assertEquals(1000, output.size(), "Both records should be processed");
}
@Test
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
index 0b10bc4f4dc9..49d4370266e8 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.partitioner.index;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -35,6 +36,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -42,6 +44,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Test cases for {@link RecordLevelIndexBackend}.
@@ -98,7 +102,11 @@ public class TestRecordLevelIndexBackend {
assertEquals(newLocation, location);
// previous instant commit success, clean
- recordLevelIndexBackend.onCommitSuccess(1);
+ Correspondent correspondent = mock(Correspondent.class);
+ Map<Long, String> inflightInstants = new HashMap<>();
+ inflightInstants.put(1L, "0001");
+
when(correspondent.requestInflightInstants()).thenReturn(inflightInstants);
+ recordLevelIndexBackend.onCheckpointComplete(correspondent);
assertEquals(1,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
// the cache will only contain 'new_key', others are cleaned.
location = recordLevelIndexBackend.getRecordIndexCache().get("new_key");
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCorrespondent.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCorrespondent.java
index 6669ed9f5ace..7d4bb95416cc 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCorrespondent.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockCorrespondent.java
@@ -22,6 +22,8 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.event.Correspondent;
+import java.util.Map;
+
/**
* A mock {@link Correspondent} that always return the latest instant.
*/
@@ -40,4 +42,14 @@ public class MockCorrespondent extends Correspondent {
throw new HoodieException("Error requesting the instant time from the
coordinator", e);
}
}
+
+ @Override
+ public Map<Long, String> requestInflightInstants() {
+ try {
+ InflightInstantsResponse response =
CoordinationResponseSerDe.unwrap(this.coordinator.handleCoordinationRequest(InflightInstantsRequest.getInstance()).get());
+ return response.getInflightInstants();
+ } catch (Exception e) {
+ throw new HoodieException("Error requesting the inflight instants from
the coordinator", e);
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index bd4c5f7153b0..5ed0bc961de7 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -83,6 +83,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
private final MockOperatorCoordinatorContext coordinatorContext;
@Getter
private StreamWriteOperatorCoordinator coordinator;
+ private MockCorrespondent correspondent;
private final MockStateInitializationContext stateInitializationContext;
private final TreeMap<Long, byte[]> coordinatorStateStore;
private final KeyedProcessFunction.Context context;
@@ -134,6 +135,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
// one function
this.coordinatorContext = new MockOperatorCoordinatorContext(new
OperatorID(), 1);
this.coordinator = new StreamWriteOperatorCoordinator(conf,
this.coordinatorContext);
+ this.correspondent = new MockCorrespondent(coordinator);
this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
this.stateInitializationContext = new MockStateInitializationContext();
this.coordinatorStateStore = new TreeMap<>();
@@ -158,6 +160,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
bucketAssignerFunction = new BucketAssignFunction(conf);
bucketAssignerFunction.setRuntimeContext(runtimeContext);
+ bucketAssignerFunction.setCorrespondent(correspondent);
bucketAssignerFunction.open(conf);
bucketAssignerFunction.initializeState(this.stateInitializationContext);
@@ -285,6 +288,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
resetCoordinatorToCheckpoint();
this.coordinator.start();
this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
+ this.correspondent = new MockCorrespondent(coordinator);
}
public void checkpointFails(long checkpointId) {
@@ -313,6 +317,11 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
return this.writeFunction;
}
+ @Override
+ public BucketAssignFunction getBucketAssignFunction() {
+ return this.bucketAssignerFunction;
+ }
+
public boolean isKeyInState(HoodieKey hoodieKey) {
return
this.bucketAssignFunctionContext.isKeyInState(hoodieKey.getRecordKey());
}
@@ -331,7 +340,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
writeFunction.setOperatorEventGateway(gateway);
writeFunction.initializeState(this.stateInitializationContext);
writeFunction.open(conf);
- writeFunction.setCorrespondent(new MockCorrespondent(this.coordinator));
+ writeFunction.setCorrespondent(correspondent);
}
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
index b380879b3836..fdb173bc2fd5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -121,6 +122,13 @@ public interface TestFunctionWrapper<I> {
*/
AbstractWriteFunction getWriteFunction();
+ /**
+ * Returns the bucket assigner function
+ */
+ default BucketAssignFunction getBucketAssignFunction() {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Returns the data buffer of the write task.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index b59d32ebdbc0..b9e4847ca6ee 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -35,6 +35,8 @@ import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.partitioner.index.IndexBackend;
+import org.apache.hudi.sink.partitioner.index.RecordLevelIndexBackend;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -313,6 +315,17 @@ public class TestWriteBase {
return this;
}
+ /**
+ * Asserts the num of inflight caches record index cache in bucket
assigner.
+ */
+ public TestHarness assertInflightCachesOfBucketAssigner(int expected) {
+ IndexBackend indexBackend =
pipeline.getBucketAssignFunction().getIndexBackend();
+ if (indexBackend instanceof RecordLevelIndexBackend) {
+ assertEquals(expected, ((RecordLevelIndexBackend)
indexBackend).getRecordIndexCache().getCaches().size());
+ }
+ return this;
+ }
+
/**
* Checkpoints the pipeline, which triggers the data write and event send.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 17bd2d77faa1..60a55a74f1a5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -94,6 +94,7 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
+import static java.util.Arrays.stream;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ;
import static org.apache.hudi.utils.TestConfigurations.catalog;
@@ -2903,14 +2904,12 @@ public class ITTestHoodieDataSource {
+ "+I[id3, id3, Julian, 43, 1970-01-01T00:00:03, par1, par1]]");
}
- @ParameterizedTest
- @ValueSource(ints = {-1, 100})
- void testMiniBatchBucketAssign(int bucketAssignMinibatchSize) throws
Exception {
- TableEnvironment tableEnv = batchTableEnv;
+ @Test
+ void testMiniBatchBucketAssign() throws Exception {
+ TableEnvironment tableEnv = streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name())
- .option(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE,
bucketAssignMinibatchSize)
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
.option(FlinkOptions.TABLE_TYPE, COPY_ON_WRITE)
.end();