This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d69826cc5dc2 fix(flink): Trigger a failover after pending instants
recommitted to b… (#18434)
d69826cc5dc2 is described below
commit d69826cc5dc263bce8d3edc8b69d4e7ceba3c045
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Apr 3 09:48:06 2026 +0800
fix(flink): Trigger a failover after pending instants recommitted to b…
(#18434)
* fix(flink): Trigger a failover after pending instants recommitted to
bootstrap RLI completely
* fix comments
---
.../apache/hudi/configuration/FlinkOptions.java | 8 --
.../hudi/sink/StreamWriteOperatorCoordinator.java | 48 +++++++-----
.../hudi/sink/bootstrap/RLIBootstrapOperator.java | 89 +---------------------
.../org/apache/hudi/sink/event/Correspondent.java | 38 ---------
.../org/apache/hudi/sink/utils/EventBuffers.java | 15 +---
.../java/org/apache/hudi/sink/utils/Pipelines.java | 7 +-
.../sink/TestStreamWriteOperatorCoordinator.java | 28 -------
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 51 +++++++++++++
.../hudi/sink/utils/MockStateSnapshotContext.java | 41 ++++++++++
.../sink/utils/StreamWriteFunctionWrapper.java | 48 +++++++++++-
.../apache/hudi/sink/utils/TestEventBuffers.java | 42 ----------
.../org/apache/hudi/sink/utils/TestWriteBase.java | 13 ++++
12 files changed, 180 insertions(+), 248 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index f2907b06d66b..cf382e748fe8 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -908,14 +908,6 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(ClientIds.INIT_CLIENT_ID)
.withDescription("Unique identifier used to distinguish different writer
pipelines for concurrent mode");
- // this is only for internal use
- @AdvancedConfig
- public static final ConfigOption<String> WRITE_OPERATOR_UID = ConfigOptions
- .key("write.operator.uid")
- .stringType()
- .noDefaultValue()
- .withDescription("The write operator uid used as the uid for hudi sink
transformation.");
-
// ------------------------------------------------------------------------
// Compaction Options
// ------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 1c1add8b721b..82fcb3ed5c57 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -327,6 +327,13 @@ public class StreamWriteOperatorCoordinator
if (checkpointData != null) {
initEventBufferIfNecessary();
this.eventBuffers.addEventsToBuffer(SerializationUtils.deserialize(checkpointData));
+ // resetToCheckpoint() is called in two cases:
+ // 1. The job is restarted from state, start() will be called later.
+ // 2. The job is recovered from global failover. The coordinator is
already started, and start() will not be called again.
+ if (executor != null && tableState.isRLIWithBootstrap) {
+ // use sync execution here to make sure the recommitting finishes
before RLI bootstrapping
+ this.executor.executeSync(this::restoreEvents, "Recommit pending
instants on resetting to checkpoint: %s.", checkpointID);
+ }
}
}
@@ -364,8 +371,12 @@ public class StreamWriteOperatorCoordinator
}
@Override
- public void subtaskReset(int i, long l) {
- // no operation
+ public void subtaskReset(int i, long resetCkpId) {
+ // There exists pending instants waiting for recommiting.
+ if (tableState.isRLIWithBootstrap &&
!eventBuffers.getPendingInstantsBefore(resetCkpId).isEmpty()) {
+ // use sync execution here to make sure the recommitting finishes before
RLI bootstrapping
+ executor.executeSync(() -> commitInstants(resetCkpId), "Recommit pending
instants on resetting subtask %s to checkpoint: %s.", i, resetCkpId);
+ }
}
@VisibleForTesting
@@ -391,9 +402,6 @@ public class StreamWriteOperatorCoordinator
if (request instanceof Correspondent.InflightInstantsRequest) {
return
handleInFlightInstantsRequest((Correspondent.InflightInstantsRequest) request);
}
- if (request instanceof Correspondent.AwaitPendingInstantsRequest) {
- return
handleAwaitPendingInstantsRequest((Correspondent.AwaitPendingInstantsRequest)
request);
- }
throw new HoodieException("Unexpected coordination request type: " +
request.getClass().getSimpleName());
}
@@ -421,16 +429,6 @@ public class StreamWriteOperatorCoordinator
return
CompletableFuture.completedFuture(CoordinationResponseSerDe.wrap(coordinationResponse));
}
- private CompletableFuture<CoordinationResponse>
handleAwaitPendingInstantsRequest(Correspondent.AwaitPendingInstantsRequest
request) {
- CompletableFuture<CoordinationResponse> response = new
CompletableFuture<>();
- instantRequestExecutor.execute(() -> {
- // wait until receiving any bootstrap event.
- eventBuffers.awaitPrevInstantsToComplete(request.getCheckpointId());
-
response.complete(CoordinationResponseSerDe.wrap(Correspondent.AwaitPendingInstantsResponse.getInstance()));
- }, "await pending instants to complete");
- return response;
- }
-
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
@@ -532,16 +530,16 @@ public class StreamWriteOperatorCoordinator
* Recommits the last inflight instant if the write metadata checkpoint
successfully
* but was not committed due to some rare cases.
*/
- private void recommitInstant(long checkpointId, String instant, EventBuffer
bootstrapBuffer) {
+ private boolean recommitInstant(long checkpointId, String instant,
EventBuffer bootstrapBuffer) {
HoodieTimeline completedTimeline =
this.metaClient.getActiveTimeline().filterCompletedInstants();
- recommitInstant(completedTimeline, checkpointId, instant, bootstrapBuffer);
+ return recommitInstant(completedTimeline, checkpointId, instant,
bootstrapBuffer);
}
/**
* Recommits the last inflight instant if the write metadata checkpoint
successfully
* but was not committed due to some rare cases.
*/
- private void recommitInstant(HoodieTimeline completedTimeline, long
checkpointId, String instant, EventBuffer bootstrapBuffer) {
+ private boolean recommitInstant(HoodieTimeline completedTimeline, long
checkpointId, String instant, EventBuffer bootstrapBuffer) {
if (!completedTimeline.containsInstant(instant)) {
log.info("Recommit instant {}", instant);
// Recommit should start heartbeat for lazy failed writes clean policy
to avoid aborting for heartbeat expired;
@@ -549,7 +547,11 @@ public class StreamWriteOperatorCoordinator
if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
writeClient.getHeartbeatClient().start(instant);
}
- commitInstant(checkpointId, instant, bootstrapBuffer);
+ return commitInstant(checkpointId, instant, bootstrapBuffer);
+ } else {
+ // clean the corresponding event buffer if the instant is already
committed.
+ eventBuffers.reset(checkpointId);
+ return false;
}
}
@@ -562,7 +564,11 @@ public class StreamWriteOperatorCoordinator
eventBuffer.addBootstrapEvent(event);
if (eventBuffer.allBootstrapEventsReceived()) {
// start to recommit the instant.
- recommitInstant(event.getCheckpointId(), event.getInstantTime(),
eventBuffer);
+ boolean committed = recommitInstant(event.getCheckpointId(),
event.getInstantTime(), eventBuffer);
+ if (committed && tableState.isRLIWithBootstrap) {
+ context.failJob(new HoodieException("There are pending instants
recommitted on job restart,"
+ + "triggering a global failover so that RLI bootstrap operator can
load the record level index completely."));
+ }
}
}
@@ -753,6 +759,7 @@ public class StreamWriteOperatorCoordinator
final boolean syncMetadata;
final boolean isDeltaTimeCompaction;
final boolean isStreamingIndexWriteEnabled;
+ final boolean isRLIWithBootstrap;
private TableState(Configuration conf) {
this.operationType =
WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION));
@@ -766,6 +773,7 @@ public class StreamWriteOperatorCoordinator
this.syncMetadata = conf.get(FlinkOptions.METADATA_ENABLED);
this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
this.isStreamingIndexWriteEnabled =
OptionsResolver.isStreamingIndexWriteEnabled(conf);
+ this.isRLIWithBootstrap = OptionsResolver.isRLIWithBootstrap(conf);
}
public static TableState create(Configuration conf) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
index 55b5d65cf232..515b180d3cad 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
@@ -25,32 +25,19 @@ import
org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
-import org.apache.hudi.sink.event.Correspondent;
-import org.apache.hudi.sink.utils.OperatorIDGenerator;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.RuntimeContextUtils;
import lombok.extern.slf4j.Slf4j;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
/**
* Bootstrap operator that loads record level index (RLI) data from metadata
table.
@@ -65,60 +52,17 @@ import java.util.stream.StreamSupport;
public class RLIBootstrapOperator
extends AbstractBootstrapOperator {
- private final OperatorID dataWriteOperatorId;
-
- private transient HoodieTableMetaClient metaClient;
private transient HoodieBackedTableMetadata metadataTable;
- private transient Correspondent correspondent;
private transient long loadedCnt;
- /**
- * The last checkpoint id, starts from -1.
- */
- private long checkpointId = -1;
-
- /**
- * List state of the JobID.
- */
- private transient ListState<JobID> jobIdState;
-
public RLIBootstrapOperator(Configuration conf) {
super(conf);
- String writeOperatorUid = conf.get(FlinkOptions.WRITE_OPERATOR_UID);
- ValidationUtils.checkArgument(writeOperatorUid != null,
- "Write operator UID should not be null when index is Record Level
Index.");
- this.dataWriteOperatorId = OperatorIDGenerator.fromUid(writeOperatorUid);
- }
-
- @Override
- public void setup(StreamTask<?, ?> containingTask, StreamConfig config,
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
- super.setup(containingTask, config, output);
- this.correspondent = Correspondent.getInstance(dataWriteOperatorId,
-
getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway());
}
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
- this.jobIdState = context.getOperatorStateStore().getListState(
- new ListStateDescriptor<>(
- "job-id-state",
- TypeInformation.of(JobID.class)
- ));
loadedCnt = 0;
-
- int attemptId = RuntimeContextUtils.getAttemptNumber(getRuntimeContext());
- if (context.isRestored()) {
- initCheckpointId(attemptId,
context.getRestoredCheckpointId().orElse(-1L));
- }
-
- if (context.isRestored()) {
- // Wait for pending instants being committed successfully before loading
the record index
- log.info("Waiting for pending instants committed before RLI bootstrap.");
- correspondent.awaitPendingInstantsCommitted(checkpointId);
- log.info("All pending instants are completed, continue RLI bootstrap.");
- }
-
- this.metaClient = StreamerUtil.createMetaClient(conf);
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
this.metadataTable = (HoodieBackedTableMetadata)
metaClient.getTableFormat().getMetadataFactory().create(
HoodieFlinkEngineContext.DEFAULT,
metaClient.getStorage(),
@@ -128,15 +72,6 @@ public class RLIBootstrapOperator
preLoadRLIRecords();
}
- @Override
- public void snapshotState(StateSnapshotContext context) throws Exception {
- super.snapshotState(context);
- // Reload the job ID state
- reloadJobIdState();
- // Update checkpoint id
- this.checkpointId = context.getCheckpointId();
- }
-
@Override
public void close() throws Exception {
closeMetadataTable();
@@ -147,28 +82,6 @@ public class RLIBootstrapOperator
// Utilities
// -------------------------------------------------------------------------
- /**
- * Reload the job id state as the current job id.
- */
- private void reloadJobIdState() throws Exception {
- this.jobIdState.clear();
- this.jobIdState.add(RuntimeContextUtils.getJobId(getRuntimeContext()));
- }
-
- private void initCheckpointId(int attemptId, long restoredCheckpointId)
throws Exception {
- if (attemptId <= 0) {
- // returns early if the job/task is initially started.
- return;
- }
- JobID currentJobId = RuntimeContextUtils.getJobId(getRuntimeContext());
- if (StreamSupport.stream(this.jobIdState.get().spliterator(), false)
- .noneMatch(currentJobId::equals)) {
- // do not set up the checkpoint id if the state comes from the old job.
- return;
- }
- this.checkpointId = restoredCheckpointId;
- }
-
private void preLoadRLIRecords() {
int taskID =
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
int parallelism =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
index 5d599514bb0b..9bc8f8242ec5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
@@ -100,18 +100,6 @@ public class Correspondent {
}
}
- /**
- * Requests coordinator to wait until all pending instants are committed if
necessary.
- */
- public void awaitPendingInstantsCommitted(long checkpointId) {
- try {
- this.gateway.sendRequestToCoordinator(this.operatorID,
- new
SerializedValue<>(AwaitPendingInstantsRequest.getInstance(checkpointId))).get();
- } catch (Exception e) {
- throw new HoodieException("Error awaiting pending instants completion
from coordinator", e);
- }
- }
-
/**
* A request for instant time with a given checkpoint id.
*/
@@ -165,30 +153,4 @@ public class Correspondent {
return new InflightInstantsResponse(inflightInstants);
}
}
-
- /**
- * A request to wait until all pending instants are committed in the
coordinator.
- */
- @AllArgsConstructor(access = AccessLevel.PRIVATE)
- @Getter
- public static class AwaitPendingInstantsRequest implements
CoordinationRequest {
-
- private final long checkpointId;
-
- public static AwaitPendingInstantsRequest getInstance(long checkpointId) {
- return new AwaitPendingInstantsRequest(checkpointId);
- }
- }
-
- /**
- * A response to indicate pending instants are all completed.
- */
- @AllArgsConstructor(access = AccessLevel.PRIVATE)
- @Getter
- public static class AwaitPendingInstantsResponse implements
CoordinationResponse {
-
- public static AwaitPendingInstantsResponse getInstance() {
- return new AwaitPendingInstantsResponse();
- }
- }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
index 060a74198f5f..87e1010e2227 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
@@ -49,30 +49,25 @@ public class EventBuffers implements Serializable {
// {checkpointId -> (instant, data write events, index write events)}
private final Map<Long, Pair<String, EventBuffer>> eventBuffers;
private final Option<CommitGuard> commitGuardOption;
- private final Option<CommitGuard> indexBootstrapGuardOption;
private final int dataWriteParallelism;
private final int indexWriteParallelism;
private EventBuffers(
Map<Long, Pair<String, EventBuffer>> eventBuffers,
Option<CommitGuard> commitGuardOption,
- Option<CommitGuard> indexBootstrapGuardOption,
int dataWriteParallelism,
int indexWriteParallelism) {
this.eventBuffers = eventBuffers;
this.commitGuardOption = commitGuardOption;
this.dataWriteParallelism = dataWriteParallelism;
this.indexWriteParallelism = indexWriteParallelism;
- this.indexBootstrapGuardOption = indexBootstrapGuardOption;
}
public static EventBuffers getInstance(Configuration conf, int
dataWriteParallelism) {
final Option<CommitGuard> commitGuardOpt =
OptionsResolver.isBlockingInstantGeneration(conf)
?
Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)))
: Option.empty();
- final Option<CommitGuard> indexBootstrapGuardOption =
OptionsResolver.isRLIWithBootstrap(conf)
- ?
Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)))
: Option.empty();
final int indexWriteParallelism =
OptionsResolver.indexWriteParallelism(conf);
- return new EventBuffers(new ConcurrentSkipListMap<>(), commitGuardOpt,
indexBootstrapGuardOption, dataWriteParallelism, indexWriteParallelism);
+ return new EventBuffers(new ConcurrentSkipListMap<>(), commitGuardOpt,
dataWriteParallelism, indexWriteParallelism);
}
public EventBuffer addEventToBuffer(WriteMetadataEvent event) {
@@ -148,17 +143,9 @@ public class EventBuffers implements Serializable {
}
}
- public void awaitPrevInstantsToComplete(long checkpointId) {
- List<String> pendingInstants = getPendingInstantsBefore(checkpointId);
- if (!pendingInstants.isEmpty() &&
this.indexBootstrapGuardOption.isPresent()) {
- this.indexBootstrapGuardOption.get().blockFor(() ->
getPendingInstantsBefore(checkpointId));
- }
- }
-
public void reset(long checkpointId) {
this.eventBuffers.remove(checkpointId);
this.commitGuardOption.ifPresent(CommitGuard::unblock);
- this.indexBootstrapGuardOption.ifPresent(CommitGuard::unblock);
}
public boolean nonEmpty() {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 09c6e4dec448..47d1f8e9e716 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -286,9 +286,6 @@ public class Pipelines {
if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
boolean isRliBootstrap = OptionsResolver.isRecordLevelIndex(conf);
- if (isRliBootstrap) {
- conf.set(FlinkOptions.WRITE_OPERATOR_UID,
Pipelines.opUID("stream_write", conf));
- }
dataStream1 = dataStream1
.transform(
"index_bootstrap",
@@ -407,9 +404,7 @@ public class Pipelines {
throw new HoodieNotSupportedException("Unknown bucket index engine
type: " + bucketIndexEngineType);
}
} else {
- // if the index is RLI, the write operator UID will be pre-generated and
set into the configuration.
- String writeOperatorUid = conf.get(FlinkOptions.WRITE_OPERATOR_UID);
- writeOperatorUid = writeOperatorUid == null ? opUID("stream_write",
conf) : writeOperatorUid;
+ String writeOperatorUid = opUID("stream_write", conf);
// uuid is used to generate operator id for the write operator, then the
bucket assign operator can send
// operator event to the coordinator of the write operator based on the
operator id.
// @see org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway.
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 0d6a375bf7a6..a5ccb2ffb20f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -38,7 +38,6 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.muttley.AthenaIngestionGateway;
import org.apache.hudi.sink.event.Correspondent;
@@ -672,33 +671,6 @@ public class TestStreamWriteOperatorCoordinator {
assertEquals(instant2, inflightInstants.get(2L));
}
- @Test
- void testHandleAwaitPendingInstantsRequest() throws Exception {
- Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
- conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
- conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
- conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
- conf.set(FlinkOptions.INDEX_WRITE_TASKS, 4);
- coordinator = createCoordinator(conf, 1);
-
- Thread t = new Thread(() -> {
- try {
- CompletableFuture<CoordinationResponse> responseFuture =
-
coordinator.handleCoordinationRequest(Correspondent.AwaitPendingInstantsRequest.getInstance(1));
- Correspondent.AwaitPendingInstantsResponse response =
- CoordinationResponseSerDe.unwrap(responseFuture.get());
- assertNotNull(response);
- } catch (Exception e) {
- throw new HoodieException(e);
- }
- });
- t.start();
- // send a bootstrap event to unblock the simulated request from bootstrap
operator.
- WriteMetadataEvent event1 = createBootstrapEvent(0, 0,
coordinator.getInstant(), "par1");
- coordinator.handleEventFromOperator(0, event1);
- t.join();
- }
-
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index c753ba4e6dc1..5e4deee54c96 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -18,9 +18,11 @@
package org.apache.hudi.sink;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -29,6 +31,7 @@ import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestData;
@@ -266,6 +269,54 @@ public class TestWriteMergeOnRead extends
TestWriteCopyOnWrite {
.end();
}
+ @Test
+ public void testRecommitOnJobRestartTriggeringGlobalFailover() throws
Exception {
+ conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
+ conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), "true");
+ conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 10_000L);
+ conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+
+ Map<String, String> expected = new HashMap<>();
+ expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
+ expected.put("par2", "[id3,par2,id3,Julian,53,3,par2,
id4,par2,id4,Fabian,31,4,par2]");
+
+ TestHarness testHarness = preparePipeline(conf)
+ .consume(TestData.DATA_SET_PART1)
+ .emptyEventBuffer()
+ .checkpoint(1)
+ .assertNextEvent(1, "par1")
+ .consume(TestData.DATA_SET_PART3)
+ .checkpoint(2)
+ // both ckp-1 and ckp-2 are not committing
+ .assertNextEvent(1, "par2")
+ // then simulating restarting job manually, coordinator will reset to
ckp-2
+ // and recommit write metadata for ckp-1
+ .restartCoordinator()
+ .subTaskFails(0, 0);
+
+ // global failover is not triggered now, since write metadata events from
writer state are not recommitted.
+ testHarness.assertGlobalFailure(false);
+
+ testHarness.assertNextEvent();
+ // global failover is triggered now, since all the pending write metadata
events are recommitted.
+ testHarness.assertGlobalFailure(true);
+
+ testHarness.subTaskFails(0, 0)
+ .checkIndexLoaded(
+ new HoodieKey("id1", "par1"),
+ new HoodieKey("id3", "par2"))
+ // insert another batch of data.
+ .consume(TestData.DATA_SET_PART4)
+ .checkpoint(3)
+ .assertNextEvent(1, "par2")
+ // write metadata will be committed for ckp-3
+ .checkpointComplete(3)
+ // there should be 3 rows and 2 partitions
+ .checkWrittenData(expected, 2)
+ .end();
+ }
+
@Test
public void testInsertDuplicateRecordsWithCDCMode() throws Exception {
conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 10_000L);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateSnapshotContext.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateSnapshotContext.java
new file mode 100644
index 000000000000..b9deee81adf0
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/MockStateSnapshotContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import
org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
+
+/** A {@link StateSnapshotContext} for testing purpose. */
+public class MockStateSnapshotContext extends MockFunctionSnapshotContext
implements StateSnapshotContext {
+ public MockStateSnapshotContext(long checkpointId) {
+ super(checkpointId);
+ }
+
+ @Override
+ public KeyedStateCheckpointOutputStream getRawKeyedOperatorStateOutput()
throws Exception {
+ throw new UnsupportedOperationException("Unsupported now.");
+ }
+
+ @Override
+ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput()
throws Exception {
+ throw new UnsupportedOperationException("Unsupported now.");
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 254592556669..dac07150295e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -28,8 +28,10 @@ import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.bootstrap.AbstractBootstrapOperator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
+import org.apache.hudi.sink.bootstrap.RLIBootstrapOperator;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
@@ -59,6 +61,7 @@ import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
+import org.mockito.Mockito;
import java.util.HashSet;
import java.util.List;
@@ -66,7 +69,9 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -99,7 +104,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
/**
* Function that load index in state.
*/
- private BootstrapOperator bootstrapOperator;
+ private AbstractBootstrapOperator bootstrapOperator;
/**
* Function that assigns bucket ID.
*/
@@ -179,9 +184,12 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
bucketAssignerFunction.initializeState(this.stateInitializationContext);
if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
- bootstrapOperator = new BootstrapOperator(conf);
+ bootstrapOperator = isStreamingWriteIndexEnabled ? Mockito.spy(new
RLIBootstrapOperator(conf)) : new BootstrapOperator(conf);
CollectOutputAdapter<HoodieFlinkInternalRow> output = new
CollectOutputAdapter<>();
bootstrapOperator.setup(streamTask, streamConfig, output);
+ if (isStreamingWriteIndexEnabled) {
+ doReturn(runtimeContext).when(bootstrapOperator).getRuntimeContext();
+ }
bootstrapOperator.initializeState(this.stateInitializationContext);
Collector<HoodieFlinkInternalRow> collector =
RecordsCollector.getInstance(rowType);
@@ -258,7 +266,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
// checkpoint the coordinator first
checkpointCoordinator(checkpointId);
if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
- bootstrapOperator.snapshotState(null);
+ bootstrapOperator.snapshotState(new
MockStateSnapshotContext(checkpointId));
}
bucketAssignerFunction.snapshotState(new
MockFunctionSnapshotContext(checkpointId));
@@ -346,8 +354,12 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
coordinator.subtaskFailed(taskID, new RuntimeException("Dummy exception"));
// reset the attempt number to simulate the task failover/retries
this.runtimeContext.setAttemptNumber(attemptNumber);
+ this.bucketAssignFunctionContext.clear();
setupWriteFunction();
if (supportStreamingWriteIndex()) {
+ if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ setupIndexBootstrapFunction();
+ }
setupIndexWriteFunction();
}
}
@@ -384,7 +396,10 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
}
public boolean isAlreadyBootstrap() throws Exception {
- return this.bootstrapOperator.isAlreadyBootstrap();
+ if (this.bootstrapOperator instanceof BootstrapOperator) {
+ return ((BootstrapOperator) this.bootstrapOperator).isAlreadyBootstrap();
+ }
+ return this.bootstrapOperator != null;
}
// -------------------------------------------------------------------------
@@ -410,6 +425,27 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
indexWriteFunction.open(conf);
}
+ private void setupIndexBootstrapFunction() {
+ bootstrapOperator = Mockito.spy(new RLIBootstrapOperator(conf));
+ CollectOutputAdapter<HoodieFlinkInternalRow> output = new
CollectOutputAdapter<>();
+ bootstrapOperator.setup(streamTask, streamConfig, output);
+ doReturn(runtimeContext).when(bootstrapOperator).getRuntimeContext();
+ try {
+ // may be blocked on pending instants committing in coordinator.
+ bootstrapOperator.initializeState(this.stateInitializationContext);
+
+ Collector<HoodieFlinkInternalRow> collector =
RecordsCollector.getInstance(rowType);
+ for (HoodieFlinkInternalRow bootstrapRecord : output.getRecords()) {
+
stateInitializationContext.getKeyedStateStore().setCurrentKey(bootstrapRecord.getRecordKey());
+
when(context.getCurrentKey()).thenReturn(bootstrapRecord.getRecordKey());
+ bucketAssignerFunction.processElement(bootstrapRecord, context,
collector);
+
bucketAssignFunctionContext.setCurrentKey(bootstrapRecord.getRecordKey());
+ }
+ } catch (Exception e) {
+ throw new CompletionException(e);
+ }
+ }
+
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
@@ -424,5 +460,9 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
public boolean isKeyInState(String key) {
return this.updateKeys.contains(key);
}
+
+ public void clear() {
+ this.updateKeys.clear();
+ }
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
index 40fe4e58edd0..c2c7f7d338f3 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
@@ -19,7 +19,6 @@
package org.apache.hudi.sink.utils;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.flink.configuration.Configuration;
@@ -29,7 +28,6 @@ import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -69,46 +67,6 @@ public class TestEventBuffers {
}
}
- @Test
- void testAwaitPrevInstantsWaitsUntilAllPreviousCheckpointsReset() throws
Exception {
- Configuration conf = new Configuration();
- conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
- conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
- conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 5_000L);
- conf.set(FlinkOptions.INDEX_WRITE_TASKS, 4);
- EventBuffers eventBuffers = EventBuffers.getInstance(conf, 1);
-
- eventBuffers.initNewEventBuffer(1L, "001");
- eventBuffers.initNewEventBuffer(2L, "002");
- eventBuffers.initNewEventBuffer(3L, "003");
-
- ExecutorService executor = Executors.newSingleThreadExecutor();
- try {
- // two tasks block waiting on commit.
- CompletableFuture<Void> waitingFuture1 = CompletableFuture.runAsync(
- () -> eventBuffers.awaitPrevInstantsToComplete(3L), executor);
- CompletableFuture<Void> waitingFuture2 = CompletableFuture.runAsync(
- () -> eventBuffers.awaitPrevInstantsToComplete(3L), executor);
-
- Thread.sleep(100);
- assertFalse(waitingFuture1.isDone());
- assertFalse(waitingFuture2.isDone());
-
- eventBuffers.reset(1L);
- Thread.sleep(100);
- assertFalse(waitingFuture1.isDone(), "Checkpoint 2 still pending, should
continue blocking");
- assertFalse(waitingFuture2.isDone(), "Checkpoint 2 still pending, should
continue blocking");
-
- eventBuffers.reset(2L);
- waitingFuture1.get(2, TimeUnit.SECONDS);
- assertTrue(waitingFuture1.isDone());
- waitingFuture2.get(2, TimeUnit.SECONDS);
- assertTrue(waitingFuture2.isDone());
- } finally {
- executor.shutdownNow();
- }
- }
-
private static WriteMetadataEvent newWriteEvent(long checkpointId, String
instant) {
return WriteMetadataEvent.builder()
.taskID(0)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index c7ca2ee1b485..b4a265e2b252 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -483,6 +483,11 @@ public class TestWriteBase {
return this;
}
+ public TestHarness assertGlobalFailure(boolean failed) {
+ assertEquals(failed,
this.pipeline.getCoordinatorContext().isJobFailed());
+ return this;
+ }
+
public TestHarness checkpointThrows(long checkpointId, String message) {
// this returns early because there is no inflight instant
assertThrows(HoodieException.class, () -> checkpoint(checkpointId),
message);
@@ -616,6 +621,14 @@ public class TestWriteBase {
return this;
}
+ public TestHarness checkIndexNotLoaded(HoodieKey... keys) {
+ for (HoodieKey key : keys) {
+ assertFalse(this.pipeline.isKeyInState(key),
+ "Key: " + key + " assumes to not in the index state");
+ }
+ return this;
+ }
+
public TestHarness assertBootstrapped() throws Exception {
assertTrue(this.pipeline.isAlreadyBootstrap());
return this;