This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch revert-3915-HUDI-2677 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ea1a6bfcc4258692157634c35d6810bb91632af8 Author: Danny Chan <[email protected]> AuthorDate: Thu Nov 4 20:45:36 2021 +0800 Revert "[HUDI-2677] Add DFS based message queue for flink writer (#3915)" This reverts commit dbf8c44bdb3019f2ce93d6b1224d9d478c0340fa. --- .../org/apache/hudi/sink/StreamWriteFunction.java | 8 +- .../hudi/sink/StreamWriteOperatorCoordinator.java | 65 +++++--- .../hudi/sink/bulk/BulkInsertWriteFunction.java | 40 ++--- .../sink/common/AbstractStreamWriteFunction.java | 98 ++---------- .../org/apache/hudi/sink/message/MessageBus.java | 173 --------------------- .../apache/hudi/sink/message/MessageClient.java | 126 --------------- .../apache/hudi/sink/message/MessageDriver.java | 132 ---------------- .../apache/hudi/sink/message/TestMessageBus.java | 137 ---------------- .../sink/utils/StreamWriteFunctionWrapper.java | 1 - 9 files changed, 79 insertions(+), 701 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 11564d1..0e7e35e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -137,7 +137,6 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { @Override public void close() { - super.close(); if (this.writeClient != null) { this.writeClient.cleanHandlesGracefully(); this.writeClient.close(); @@ -402,6 +401,11 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { } } + private boolean hasData() { + return this.buckets.size() > 0 + && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0); + } + @SuppressWarnings("unchecked, rawtypes") private boolean flushBucket(DataBucket bucket) { String instant = instantToWrite(true); @@ -435,7 +439,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { @SuppressWarnings("unchecked, rawtypes") private void flushRemaining(boolean endInput) { - this.currentInstant = instantToWrite(false); + this.currentInstant = instantToWrite(hasData()); if (this.currentInstant == null) { // in case there are empty checkpoints that has no input data throw new HoodieException("No inflight instant when flushing data!"); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index a30d766..feb348f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -30,9 +30,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; -import org.apache.hudi.sink.message.MessageBus; -import org.apache.hudi.sink.message.MessageDriver; import org.apache.hudi.sink.utils.HiveSyncContext; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.StreamerUtil; @@ -42,6 +41,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,11 +137,6 @@ public class StreamWriteOperatorCoordinator private transient TableState tableState; /** - * The message driver. - */ - private MessageDriver messageDriver; - - /** * Constructs a StreamingSinkOperatorCoordinator. * * @param conf The config options @@ -179,7 +174,6 @@ public class StreamWriteOperatorCoordinator if (tableState.syncMetadata) { initMetadataSync(); } - this.messageDriver = MessageBus.getDriver(this.metaClient.getFs(), metaClient.getBasePath()); } @Override @@ -197,9 +191,6 @@ public class StreamWriteOperatorCoordinator writeClient.close(); } this.eventBuffer = null; - if (this.messageDriver != null) { - this.messageDriver.close(); - } } @Override @@ -236,7 +227,7 @@ public class StreamWriteOperatorCoordinator writeClient.scheduleCompaction(Option.empty()); } // start new instant. - startInstant(checkpointId); + startInstant(); // sync Hive if is enabled syncHiveIfEnabled(); } @@ -246,7 +237,12 @@ public class StreamWriteOperatorCoordinator @Override public void notifyCheckpointAborted(long checkpointId) { - this.messageDriver.abortCkp(checkpointId); + // once the checkpoint was aborted, unblock the writer tasks to + // reuse the last instant. + if (!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) { + executor.execute(() -> sendCommitAckEvents(checkpointId), + "unblock data write with aborted checkpoint %s", checkpointId); + } } @Override @@ -337,19 +333,12 @@ public class StreamWriteOperatorCoordinator } private void startInstant() { - // the flink checkpoint id starts from 1, - // see AbstractStreamWriteFunction#ackInstant - startInstant(MessageBus.INITIAL_CKP_ID); - } - - private void startInstant(long checkpoint) { final String instant = HoodieActiveTimeline.createNewInstantTime(); this.writeClient.startCommitWithTime(instant, tableState.commitAction); - this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, instant); - this.writeClient.upgradeDowngrade(instant); - this.messageDriver.commitCkp(checkpoint, this.instant, instant); this.instant = instant; - LOG.info("Create instant [{}] for table [{}] with type [{}]", instant, + this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant); + this.writeClient.upgradeDowngrade(this.instant); + LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); } @@ -409,6 +398,33 @@ public class StreamWriteOperatorCoordinator } /** + * The coordinator reuses the instant if there is no data for this round of checkpoint, + * sends the commit ack events to unblock the flushing. + */ + private void sendCommitAckEvents(long checkpointId) { + CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull) + .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId))) + .toArray(CompletableFuture<?>[]::new); + try { + CompletableFuture.allOf(futures).get(); + } catch (Throwable throwable) { + if (!sendToFinishedTasks(throwable)) { + throw new HoodieException("Error while waiting for the commit ack events to finish sending", throwable); + } + } + } + + /** + * Decides whether the given exception is caused by sending events to FINISHED tasks. + * + * <p>Ugly impl: the exception may change in the future. + */ + private static boolean sendToFinishedTasks(Throwable throwable) { + return throwable.getCause() instanceof TaskNotRunningException + || throwable.getCause().getMessage().contains("running"); + } + + /** * Commits the instant. */ private void commitInstant(String instant) { @@ -435,7 +451,8 @@ public class StreamWriteOperatorCoordinator if (writeResults.size() == 0) { // No data has written, reset the buffer and returns early reset(); - messageDriver.commitCkp(checkpointId, this.instant, this.instant); + // Send commit ack event to the write function to unblock the flushing + sendCommitAckEvents(checkpointId); return false; } doCommit(instant, writeResults); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index f5fda5a..f3cfbae 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -21,13 +21,11 @@ package org.apache.hudi.sink.bulk; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.common.AbstractWriteFunction; import org.apache.hudi.sink.event.WriteMetadataEvent; -import org.apache.hudi.sink.message.MessageBus; -import org.apache.hudi.sink.message.MessageClient; import org.apache.hudi.sink.utils.TimeWait; import org.apache.hudi.util.StreamerUtil; @@ -40,8 +38,6 @@ import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.io.IOException; import java.util.Collections; import java.util.List; @@ -84,19 +80,24 @@ public class BulkInsertWriteFunction<I> private int taskID; /** + * Meta Client. + */ + private transient HoodieTableMetaClient metaClient; + + /** * Write Client. */ private transient HoodieFlinkWriteClient writeClient; /** - * Gateway to send operator events to the operator coordinator. + * The initial inflight instant when start up. */ - private transient OperatorEventGateway eventGateway; + private volatile String initInstant; /** - * The message client. + * Gateway to send operator events to the operator coordinator. */ - private MessageClient messageClient; + private transient OperatorEventGateway eventGateway; /** * Constructs a StreamingSinkFunction. @@ -111,8 +112,9 @@ public class BulkInsertWriteFunction<I> @Override public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.metaClient = StreamerUtil.createMetaClient(this.config); this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); - this.messageClient = MessageBus.getClient(config.getString(FlinkOptions.PATH)); + this.initInstant = StreamerUtil.getLastPendingInstant(this.metaClient, false); sendBootstrapEvent(); initWriterHelper(); } @@ -128,9 +130,6 @@ public class BulkInsertWriteFunction<I> this.writeClient.cleanHandlesGracefully(); this.writeClient.close(); } - if (this.messageClient != null) { - this.messageClient.close(); - } } /** @@ -184,17 +183,8 @@ public class BulkInsertWriteFunction<I> LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); } - /** - * Returns the next instant to write from the message bus. - */ - @Nullable - private String ackInstant() { - Option<MessageBus.CkpMessage> ckpMessageOption = this.messageClient.getCkpMessage(MessageBus.INITIAL_CKP_ID); - return ckpMessageOption.map(message -> message.inflightInstant).orElse(null); - } - private String instantToWrite() { - String instant = ackInstant(); + String instant = StreamerUtil.getLastPendingInstant(this.metaClient); // if exactly-once semantics turns on, // waits for the checkpoint notification until the checkpoint timeout threshold hits. TimeWait timeWait = TimeWait.builder() @@ -202,14 +192,14 @@ public class BulkInsertWriteFunction<I> .action("instant initialize") .throwsT(true) .build(); - while (instant == null) { + while (instant == null || instant.equals(this.initInstant)) { // wait condition: // 1. there is no inflight instant // 2. the inflight instant does not change // sleep for a while timeWait.waitFor(); // refresh the inflight instant - instant = ackInstant(); + instant = StreamerUtil.getLastPendingInstant(this.metaClient); } return instant; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index c3fcec0..5ad2935 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -21,14 +21,11 @@ package org.apache.hudi.sink.common; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; -import org.apache.hudi.sink.message.MessageBus; -import org.apache.hudi.sink.message.MessageClient; import org.apache.hudi.sink.utils.TimeWait; import org.apache.hudi.util.StreamerUtil; @@ -42,14 +39,12 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.util.CollectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.List; +import java.util.Objects; /** * Base infrastructures for streaming writer function. @@ -125,11 +120,6 @@ public abstract class AbstractStreamWriteFunction<I> private long checkpointId = -1; /** - * The message client. - */ - private MessageClient messageClient; - - /** * Constructs a StreamWriteFunctionBase. * * @param config The config options @@ -150,6 +140,7 @@ public abstract class AbstractStreamWriteFunction<I> TypeInformation.of(WriteMetadataEvent.class) )); + this.currentInstant = lastPendingInstant(); if (context.isRestored()) { restoreWriteMetadata(); } else { @@ -157,7 +148,6 @@ public abstract class AbstractStreamWriteFunction<I> } // blocks flushing until the coordinator starts a new instant this.confirming = true; - this.messageClient = MessageBus.getClient(this.metaClient.getFs(), this.metaClient.getBasePath()); } @Override @@ -187,19 +177,14 @@ public abstract class AbstractStreamWriteFunction<I> // ------------------------------------------------------------------------- private void restoreWriteMetadata() throws Exception { - List<WriteMetadataEvent> events = CollectionUtil.iterableToList(this.writeMetadataState.get()); + String lastInflight = lastPendingInstant(); boolean eventSent = false; - if (events.size() > 0) { - boolean committed = this.metaClient.getActiveTimeline() - .filterCompletedInstants() - .containsInstant(events.get(0).getInstantTime()); - if (!committed) { - for (WriteMetadataEvent event : events) { - // The checkpoint succeed but the meta does not commit, - // re-commit the inflight instant - this.eventGateway.sendEventToCoordinator(event); - LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID); - } + for (WriteMetadataEvent event : this.writeMetadataState.get()) { + if (Objects.equals(lastInflight, event.getInstantTime())) { + // The checkpoint succeed but the meta does not commit, + // re-commit the inflight instant + this.eventGateway.sendEventToCoordinator(event); + LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID); eventSent = true; } } @@ -237,65 +222,21 @@ public abstract class AbstractStreamWriteFunction<I> } } - @Override - public void close() { - if (this.messageClient != null) { - this.messageClient.close(); - } - } - /** * Returns the last pending instant time. */ - private String lastPendingInstant() { - return StreamerUtil.getLastPendingInstant(metaClient); - } - - /** - * Returns the previous committed checkpoint id. - * - * @param eagerFlush Whether the data flush happens before the checkpoint barrier arrives - */ - private long prevCkp(boolean eagerFlush) { - // Use the last checkpoint id to request for the message, - // the time sequence of committed checkpoints and ongoing - // checkpoints are as following: - - // 0 ------------ 1 ------------ 2 ------------ 3 ------------> committed ckp id - // | / / / / - // |--- ckp-1 ----|--- ckp-2 ----|--- ckp-3 ----|--- ckp-4 ----| ongoing ckp id - - // Use 0 as the initial committed checkpoint id, the 0th checkpoint message records the writing instant for ckp-1; - // when ckp-1 success event is received, commits a checkpoint message with the writing instant for ckp-2; - // that means, the checkpoint message records the writing instant of next checkpoint. - return Math.max(0, eagerFlush ? this.checkpointId : this.checkpointId - 1); - } - - /** - * Returns the next instant to write from the message bus. - * - * <p>It returns 3 kinds of value: - * i) normal instant time: the previous checkpoint succeed; - * ii) 'aborted' instant time: the previous checkpoint has been aborted; - * ii) null: the checkpoint is till ongoing without any notifications. - */ - @Nullable - protected String ackInstant(long checkpointId) { - Option<MessageBus.CkpMessage> ckpMessageOption = this.messageClient.getCkpMessage(checkpointId); - return ckpMessageOption.map(message -> message.inflightInstant).orElse(null); + protected String lastPendingInstant() { + return StreamerUtil.getLastPendingInstant(this.metaClient); } /** * Prepares the instant time to write with for next checkpoint. * - * @param eagerFlush Whether the data flush happens before the checkpoint barrier arrives - * + * @param hasData Whether the task has buffering data * @return The instant time */ - protected String instantToWrite(boolean eagerFlush) { - final long ckpId = prevCkp(eagerFlush); - String instant = ackInstant(ckpId); - + protected String instantToWrite(boolean hasData) { + String instant = lastPendingInstant(); // if exactly-once semantics turns on, // waits for the checkpoint notification until the checkpoint timeout threshold hits. TimeWait timeWait = TimeWait.builder() @@ -306,23 +247,18 @@ public abstract class AbstractStreamWriteFunction<I> // wait condition: // 1. there is no inflight instant // 2. the inflight instant does not change and the checkpoint has buffering data - if (instant == null) { + if (instant == null || (instant.equals(this.currentInstant) && hasData)) { // sleep for a while boolean timeout = timeWait.waitFor(); - if (timeout && MessageBus.notInitialCkp(ckpId)) { + if (timeout && instant != null) { // if the timeout threshold hits but the last instant still not commit, // and the task does not receive commit ask event(no data or aborted checkpoint), // assumes the checkpoint was canceled silently and unblock the data flushing confirming = false; - instant = lastPendingInstant(); } else { // refresh the inflight instant - instant = ackInstant(ckpId); + instant = lastPendingInstant(); } - } else if (MessageBus.canAbort(instant, ckpId)) { - // the checkpoint was canceled, reuse the last instant - confirming = false; - instant = lastPendingInstant(); } else { // the pending instant changed, that means the last instant was committed // successfully. diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java deleted file mode 100644 index ff8f3eb..0000000 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.message; - -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.FileIOUtils; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.util.StreamerUtil; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; - -/** - * A message bus for transferring the checkpoint messages. - * - * <p>Each time the driver starts a new instant, it writes a commit message into the bus, the write tasks - * then consume the message and unblocking the data flush. - * - * <p>Why we use the DFS based message queue instead of sending - * the {@link org.apache.flink.runtime.operators.coordination.OperatorEvent} ? - * The write task handles the operator event using the main mailbox executor which has the lowest priority for mails, - * it is also used to process the inputs. When the write task blocks and waits for the operator event to ack the valid instant to write, - * it actually blocks all the following events in the mailbox, the operator event can never be consumed then it causes deadlock. - * - * <p>The message bus is also more lightweight than the active timeline. - */ -public abstract class MessageBus implements AutoCloseable { - - public static final long INITIAL_CKP_ID = 0L; - - public static final String ABORTED_CKP_INSTANT = "aborted"; - - protected static final int MESSAGE_QUEUE_LENGTH = 20; - - protected static final int CLIENT_MESSAGE_CACHE_SIZE = 10; - - private static final String MESSAGE_BUS = "message_bus"; - - private static final String COMMIT = "commit"; - - private static final String COMMIT_EXTENSION = "." + COMMIT; - private static final String ABORTED_EXTENSION = ".aborted"; - - protected final FileSystem fs; - protected final String basePath; - protected final String messageBusPath; - - protected MessageBus(FileSystem fs, String basePath) { - this.fs = fs; - this.basePath = basePath; - this.messageBusPath = messageBusPath(basePath); - } - - public static MessageDriver getDriver(FileSystem fs, String basePath) { - return MessageDriver.getInstance(fs, basePath); - } - - public static MessageClient getClient(FileSystem fs, String basePath) { - return MessageClient.getSingleton(fs, basePath); - } - - public static MessageClient getClient(String basePath) { - FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()); - return MessageClient.getSingleton(fs, basePath); - } - - // ------------------------------------------------------------------------- - // Utilities - // ------------------------------------------------------------------------- - public static boolean canAbort(String instant, long checkpointId) { - return ABORTED_CKP_INSTANT.equals(instant) && MessageBus.notInitialCkp(checkpointId); - } - - public static boolean notInitialCkp(long checkpointId) { - return checkpointId != INITIAL_CKP_ID; - } - - protected Path fullFilePath(String fileName) { - return new Path(messageBusPath, fileName); - } - - protected static String messageBusPath(String basePath) { - return basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + MESSAGE_BUS; - } - - protected static String getCommitFileName(long checkpointId) { - return checkpointId + COMMIT_EXTENSION; - } - - protected static String getAbortedFileName(long checkpointId) { - return checkpointId + ABORTED_EXTENSION; - } - - // ------------------------------------------------------------------------- - // Inner Class - // ------------------------------------------------------------------------- - - /** - * A checkpoint message. - */ - public static class CkpMessage { - private static final String SEPARATOR = ","; - - public final boolean committed; // whether the checkpoint is committed - - public final long checkpointId; - public final String commitInstant; - public final String inflightInstant; - - private CkpMessage(long checkpointId, String commitInstant, String inflightInstant) { - this.committed = true; - this.checkpointId = checkpointId; - this.commitInstant = commitInstant; - this.inflightInstant = inflightInstant; - } - - private CkpMessage(long checkpointId) { - this.committed = false; - this.checkpointId = checkpointId; - this.commitInstant = ABORTED_CKP_INSTANT; - this.inflightInstant = ABORTED_CKP_INSTANT; - } - - /** - * Encodes the instants as 'commitInstant,inflightInstant'. - */ - public static byte[] toBytes(String commitInstant, String inflightInstant) { - return (commitInstant + SEPARATOR + inflightInstant).getBytes(StandardCharsets.UTF_8); - } - - public static CkpMessage fromBytes(long checkpointId, byte[] bytes) { - String content = new String(bytes, StandardCharsets.UTF_8); - String[] splits = content.split(SEPARATOR); - return new CkpMessage(checkpointId, splits[0], splits[1]); - } - - public static CkpMessage fromPath(FileSystem fs, Path path) throws IOException { - final String[] splits = path.getName().split("\\."); - ValidationUtils.checkState(splits.length == 2, "Invalid checkpoint message file name: " + path.getName()); - final long checkpointId = Long.parseLong(splits[0]); - final String suffix = splits[1]; - if (suffix.equals(COMMIT)) { - try (FSDataInputStream is = fs.open(path)) { - byte[] bytes = FileIOUtils.readAsByteArray(is); - return CkpMessage.fromBytes(checkpointId, bytes); - } - } else { - return new CkpMessage(checkpointId); - } - } - } -} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java deleted file mode 100644 index ea893d5..0000000 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.message; - -import org.apache.hudi.common.util.Option; -import org.apache.hudi.exception.HoodieException; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; - -/** - * A client that consumes messages from the {@link MessageBus}. - */ -public class MessageClient extends MessageBus { - private static final Logger LOG = LoggerFactory.getLogger(MessageClient.class); - - private static final Map<String, MessageClient> CLIENTS = new HashMap<>(); - - private final TreeMap<Long, CkpMessage> ckpCache; // checkpoint id -> CkpMessage mapping - - private MessageClient(FileSystem fs, String basePath) throws IOException { - super(fs, basePath); - this.ckpCache = new TreeMap<>(); - } - - /** - * Returns the message bus instance. - * - * <p>This expects to be called by the client. - * - * @param fs The filesystem - * @param basePath The table base path - * @return The instance of message bus - */ - private static MessageClient getInstance(FileSystem fs, String basePath) { - try { - return new MessageClient(fs, basePath); - } catch (IOException e) { - throw new HoodieException("Initialize checkpoint message bus error", e); - } - } - - /** - * Returns the singleton message bus instance. - * - * <p>This expects to be called by the client. - * - * @param fs The filesystem - * @param basePath The table base path - * @return The instance of message bus - */ - public static synchronized MessageClient getSingleton(FileSystem fs, String basePath) { - return CLIENTS.computeIfAbsent(basePath, - k -> getInstance(fs, basePath)); - } - - public synchronized Option<CkpMessage> getCkpMessage(long checkpointId) { - if (this.ckpCache.size() >= CLIENT_MESSAGE_CACHE_SIZE) { - this.ckpCache.pollFirstEntry(); - } - if (this.ckpCache.containsKey(checkpointId)) { - return Option.of(this.ckpCache.get(checkpointId)); - } - final Path commitFilePath = fullFilePath(getCommitFileName(checkpointId)); - try { - if (fs.exists(commitFilePath)) { - CkpMessage ckpMessage = CkpMessage.fromPath(fs, commitFilePath); - this.ckpCache.put(checkpointId, ckpMessage); - return Option.of(ckpMessage); - } - } catch (Throwable e) { - // ignored - LOG.warn("Read committed checkpoint message error: " + checkpointId, e); - return Option.empty(); - } - final Path abortedFilePath = fullFilePath(getAbortedFileName(checkpointId)); - try { - if (fs.exists(abortedFilePath)) { - CkpMessage ckpMessage = CkpMessage.fromPath(fs, abortedFilePath); - this.ckpCache.put(checkpointId, ckpMessage); - return Option.of(ckpMessage); - } - } catch (Throwable e) { - // ignored - LOG.warn("Read aborted checkpoint message error: " + checkpointId, e); - return Option.empty(); - } - return Option.empty(); - } - - @VisibleForTesting - public TreeMap<Long, CkpMessage> getCkpCache() { - return ckpCache; - } - - @Override - public void close() { - synchronized (this) { - this.ckpCache.clear(); - } - } -} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java deleted file mode 100644 index bf98209..0000000 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.message; - -import org.apache.hudi.exception.HoodieException; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.util.Map; -import java.util.TreeMap; - -/** - * A driver that produces messages to the {@link MessageBus}. - */ -public class MessageDriver extends MessageBus { - private final TreeMap<Long, Boolean> ckpIdCache; // checkpoint id -> isCommitted mapping - - public MessageDriver(FileSystem fs, String basePath) throws IOException { - super(fs, basePath); - this.ckpIdCache = new TreeMap<>(); - initialize(); - } - - /** - * Returns the message bus instance. - * - * <p>This expects to be called by the driver. - * - * @param fs The filesystem - * @param basePath The table base path - * @return The instance of message bus - */ - public static MessageDriver getInstance(FileSystem fs, String basePath) { - try { - return new MessageDriver(fs, basePath); - } catch (IOException e) { - throw new HoodieException("Initialize checkpoint message bus error", e); - } - } - - /** - * Initialize the message bus, would clean all the messages. - * - * <p>This expects to be called by the driver. - */ - private void initialize() throws IOException { - Path path = new Path(messageBusPath(basePath)); - if (fs.exists(path)) { - fs.delete(path, true); - } - fs.mkdirs(path); - } - - /** - * Add a checkpoint commit message. - * - * @param checkpointId The checkpoint id - * @param commitInstant The committed instant - * @param inflightInstant The new inflight instant - */ - public void commitCkp(long checkpointId, String commitInstant, String inflightInstant) { - Path path = fullFilePath(getCommitFileName(checkpointId)); - - try (FSDataOutputStream outputStream = fs.create(path, true)) { - byte[] bytes = CkpMessage.toBytes(commitInstant, inflightInstant); - outputStream.write(bytes); - outputStream.close(); - this.ckpIdCache.put(checkpointId, true); - clean(); - } catch (Throwable e) { - throw new HoodieException("Adding committed message error for checkpoint: " + checkpointId, e); - } - } - - /** - * Add an aborted checkpoint message. - * - * @param checkpointId The checkpoint id - */ - public void abortCkp(long checkpointId) { - Path path = fullFilePath(getAbortedFileName(checkpointId)); - try { - fs.createNewFile(path); - this.ckpIdCache.put(checkpointId, false); - clean(); - } catch (Throwable e) { - throw new HoodieException("Adding aborted message error for checkpoint: " + checkpointId, e); - } - } - - private void clean() throws IOException { - int numToClean = this.ckpIdCache.size() - MESSAGE_QUEUE_LENGTH; - if (numToClean >= 10) { - for (int i = 0; i < numToClean; i++) { - Map.Entry<Long, Boolean> entry = this.ckpIdCache.pollFirstEntry(); - final String fileName = entry.getValue() ? getCommitFileName(entry.getKey()) : getAbortedFileName(entry.getKey()); - final Path filePath = fullFilePath(fileName); - fs.delete(filePath, false); - } - } - } - - @VisibleForTesting - public TreeMap<Long, Boolean> getCkpIdCache() { - return ckpIdCache; - } - - @Override - public void close() throws Exception { - this.ckpIdCache.clear(); - } -} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java b/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java deleted file mode 100644 index b161c96..0000000 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sink.message; - -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.util.StreamerUtil; -import org.apache.hudi.utils.TestConfigurations; - -import org.apache.flink.configuration.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.stream.IntStream; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** - * Test cases for {@link MessageBus}. - */ -public class TestMessageBus { - - private String basePath; - private FileSystem fs; - - private MessageDriver driver; - - @TempDir - File tempFile; - - @BeforeEach - public void beforeEach() throws Exception { - basePath = tempFile.getAbsolutePath(); - this.fs = FSUtils.getFs(tempFile.getAbsolutePath(), StreamerUtil.getHadoopConf()); - - Configuration conf = TestConfigurations.getDefaultConf(basePath); - StreamerUtil.initTableIfNotExists(conf); - - this.driver = MessageDriver.getInstance(fs, basePath); - } - - @Test - void testWriteAndReadMessage() { - MessageClient client = MessageClient.getSingleton(fs, basePath); - - // write and read 5 committed checkpoints - IntStream.range(0, 5).forEach(i -> driver.commitCkp(i, i + "", i + 1 + "")); - - IntStream.range(0, 5).forEach(i -> { - Option<MessageBus.CkpMessage> messageOpt = client.getCkpMessage(i); - assertTrue(messageOpt.isPresent()); - - MessageBus.CkpMessage ckpMessage = messageOpt.get(); - assertTrue(ckpMessage.committed); - assertThat(ckpMessage.commitInstant, is(i + "")); - assertThat(ckpMessage.inflightInstant, is(i + 1 + "")); - }); - - // write and read 5 aborted checkpoints - IntStream.range(5, 10).forEach(i -> driver.abortCkp(i)); - - IntStream.range(5, 10).forEach(i -> { - Option<MessageBus.CkpMessage> messageOpt = client.getCkpMessage(i); - assertTrue(messageOpt.isPresent()); - - MessageBus.CkpMessage ckpMessage = messageOpt.get(); - assertFalse(ckpMessage.committed); - assertThat(ckpMessage.commitInstant, is(MessageBus.ABORTED_CKP_INSTANT)); - assertThat(ckpMessage.inflightInstant, is(MessageBus.ABORTED_CKP_INSTANT)); - }); - } - - @Test - void testWriteCleaning() { - // write and read 20 committed checkpoints - IntStream.range(0, 20).forEach(i -> driver.commitCkp(i, i + "", i + 1 + "")); - assertThat("The id cache should not be cleaned", driver.getCkpIdCache().size(), is(20)); - - // write and read 10 aborted checkpoints - IntStream.range(20, 29).forEach(i -> driver.abortCkp(i)); - assertThat("The id cache should not be cleaned", driver.getCkpIdCache().size(), is(29)); - - driver.commitCkp(29, "29", "30"); - assertThat("The cache should be cleaned", driver.getCkpIdCache().size(), is(20)); - assertThat(longSet2String(driver.getCkpIdCache().keySet()), - is("10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29")); - } - - @Test - void testReadCleaning() { - MessageClient client = MessageClient.getSingleton(fs, basePath); - - // write and read 20 committed checkpoints - IntStream.range(0, 20).forEach(i -> driver.commitCkp(i, i + "", i + 1 + "")); - - IntStream.range(0, 10).forEach(client::getCkpMessage); - assertThat("The checkpoint cache should not be cleaned", client.getCkpCache().size(), is(10)); - - client.getCkpMessage(10); - assertThat("The checkpoint cache should be cleaned", client.getCkpCache().size(), is(10)); - - IntStream.range(11, 15).forEach(client::getCkpMessage); - assertThat("The checkpoint cache should be cleaned", client.getCkpCache().size(), is(10)); - assertThat(longSet2String(client.getCkpCache().keySet()), is("5,6,7,8,9,10,11,12,13,14")); - } - - private static String longSet2String(Set<Long> longSet) { - List<String> elements = new ArrayList<>(); - longSet.stream().mapToInt(Long::intValue).forEach(i -> elements.add(i + "")); - return String.join(",", elements); - } -} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index f1f5a1f..54a142a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -248,7 +248,6 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> { public void close() throws Exception { coordinator.close(); ioManager.close(); - writeFunction.close(); } public StreamWriteOperatorCoordinator getCoordinator() {
