This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new afbde168b1d8 refactor: Add Lombok annotations to hudi-kafka-connect
(#17715)
afbde168b1d8 is described below
commit afbde168b1d809be20c1aca054a0f49e23f97a57
Author: voonhous <[email protected]>
AuthorDate: Fri Dec 26 11:01:27 2025 +0800
refactor: Add Lombok annotations to hudi-kafka-connect (#17715)
---
hudi-kafka-connect/pom.xml | 6 +++
.../apache/hudi/connect/HoodieSinkConnector.java | 16 +++---
.../org/apache/hudi/connect/HoodieSinkTask.java | 29 +++++------
.../connect/KafkaConnectFileIdPrefixProvider.java | 10 ++--
.../connect/kafka/KafkaConnectControlAgent.java | 22 ++++----
.../hudi/connect/kafka/KafkaControlProducer.java | 6 +--
.../transaction/ConnectTransactionCoordinator.java | 58 ++++++++++------------
.../transaction/ConnectTransactionParticipant.java | 37 ++++++--------
.../hudi/connect/transaction/CoordinatorEvent.java | 25 ++--------
.../hudi/connect/transaction/TransactionInfo.java | 25 ++--------
.../hudi/connect/utils/KafkaConnectUtils.java | 14 +++---
.../connect/writers/AbstractConnectWriter.java | 5 +-
.../connect/writers/BufferedConnectWriter.java | 20 +++-----
.../writers/KafkaConnectTransactionServices.java | 19 +++----
.../writers/KafkaConnectWriterProvider.java | 6 +--
.../connect/TestConnectTransactionCoordinator.java | 7 +--
.../connect/TestConnectTransactionParticipant.java | 17 ++-----
.../org/apache/hudi/helper/MockKafkaConnect.java | 17 ++-----
.../apache/hudi/helper/TestHudiWriterProvider.java | 15 ++----
.../hudi/writers/TestAbstractConnectWriter.java | 6 +--
20 files changed, 136 insertions(+), 224 deletions(-)
diff --git a/hudi-kafka-connect/pom.xml b/hudi-kafka-connect/pom.xml
index ad655a6e0d5a..ab6045743071 100644
--- a/hudi-kafka-connect/pom.xml
+++ b/hudi-kafka-connect/pom.xml
@@ -129,6 +129,12 @@
<artifactId>kryo-shaded</artifactId>
</dependency>
+ <!-- Lombok -->
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkConnector.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkConnector.java
index 9850d1647078..046cb36a2682 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkConnector.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkConnector.java
@@ -18,11 +18,11 @@
package org.apache.hudi.connect;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
@@ -32,19 +32,15 @@ import java.util.Map;
/**
* HudiSinkConnector is a Kafka Connect Connector implementation
* that ingest data from Kafka to Hudi.
+ *
*/
+@NoArgsConstructor // No-arg constructor. It is instantiated by Connect
framework.
+@Slf4j
public class HoodieSinkConnector extends SinkConnector {
public static final String VERSION = "0.1.0";
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieSinkConnector.class);
private Map<String, String> configProps;
- /**
- * No-arg constructor. It is instantiated by Connect framework.
- */
- public HoodieSinkConnector() {
- }
-
@Override
public String version() {
return VERSION;
@@ -72,7 +68,7 @@ public class HoodieSinkConnector extends SinkConnector {
@Override
public void stop() {
- LOG.info(String.format("Shutting down Hudi Sink connector %s",
configProps.get("name")));
+ log.info("Shutting down Hudi Sink connector {}", configProps.get("name"));
}
@Override
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
index 3b274f8b3ba9..bcf126ca7ddb 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java
@@ -27,6 +27,7 @@ import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
@@ -34,8 +35,6 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
@@ -47,10 +46,10 @@ import java.util.Map;
* from the assigned partitions and commit the Kafka offsets.
* Also, handles re-assignments of partitions.
*/
+@Slf4j
public class HoodieSinkTask extends SinkTask {
public static final String TASK_ID_CONFIG_NAME = "task.id";
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieSinkTask.class);
private final Map<TopicPartition, TransactionCoordinator>
transactionCoordinators;
private final Map<TopicPartition, TransactionParticipant>
transactionParticipants;
@@ -74,8 +73,7 @@ public class HoodieSinkTask extends SinkTask {
public void start(Map<String, String> props) {
connectorName = props.get("name");
taskId = props.get(TASK_ID_CONFIG_NAME);
- LOG.info(String.format("Starting Hudi Sink Task for %s connector %s with
id %s with assignments %s",
- props, connectorName, taskId, context.assignment()));
+ log.info("Starting Hudi Sink Task for {} connector {} with id {} with
assignments {}", props, connectorName, taskId, context.assignment());
try {
connectConfigs =
KafkaConnectConfigs.newBuilder().withProperties(props).build();
controlKafkaClient = KafkaConnectControlAgent.createKafkaControlManager(
@@ -84,8 +82,8 @@ public class HoodieSinkTask extends SinkTask {
} catch (ConfigException e) {
throw new ConnectException("Couldn't start HdfsSinkConnector due to
configuration error.", e);
} catch (ConnectException e) {
- LOG.error("Couldn't start HudiSinkConnector:", e);
- LOG.info("Shutting down HudiSinkConnector.");
+ log.error("Couldn't start HudiSinkConnector:", e);
+ log.info("Shutting down HudiSinkConnector.");
cleanup();
// Always throw the original exception that prevent us from starting
throw e;
@@ -147,13 +145,13 @@ public class HoodieSinkTask extends SinkTask {
@Override
public void open(Collection<TopicPartition> partitions) {
- LOG.info("New partitions added " + partitions.toString());
+ log.info("New partitions added {}", partitions.toString());
bootstrap(partitions);
}
@Override
public void close(Collection<TopicPartition> partitions) {
- LOG.info("Existing partitions deleted " + partitions.toString());
+ log.info("Existing partitions deleted {}", partitions.toString());
// Close any writers we have. We may get assigned the same partitions and
end up duplicating
// some effort since we'll have to reprocess those messages. It may be
possible to hold on to
// the TopicPartitionWriter and continue to use the temp file, but this
can get significantly
@@ -172,18 +170,17 @@ public class HoodieSinkTask extends SinkTask {
TransactionParticipant worker =
transactionParticipants.remove(partition);
if (worker != null) {
try {
- LOG.debug("Closing data writer due to task start failure.");
+ log.debug("Closing data writer due to task start failure.");
worker.stop();
} catch (Throwable t) {
- LOG.warn("Error closing and stopping data writer", t);
+ log.warn("Error closing and stopping data writer", t);
}
}
}
}
private void bootstrap(Collection<TopicPartition> partitions) {
- LOG.info(String.format("Bootstrap task for connector %s with id %s with
assignments %s part %s",
- connectorName, taskId, context.assignment(), partitions));
+ log.info("Bootstrap task for connector {} with id {} with assignments {}
part {}", connectorName, taskId, context.assignment(), partitions);
for (TopicPartition partition : partitions) {
try {
// If the partition is 0, instantiate the Leader
@@ -199,7 +196,7 @@ public class HoodieSinkTask extends SinkTask {
transactionParticipants.put(partition, worker);
worker.start();
} catch (HoodieException exception) {
- LOG.error(String.format("Fatal error initializing task %s for
partition %s", taskId, partition.partition()), exception);
+ log.error("Fatal error initializing task {} for partition {}", taskId,
partition.partition(), exception);
}
}
}
@@ -209,10 +206,10 @@ public class HoodieSinkTask extends SinkTask {
TransactionParticipant worker = transactionParticipants.get(partition);
if (worker != null) {
try {
- LOG.debug("Closing data writer due to task start failure.");
+ log.debug("Closing data writer due to task start failure.");
worker.stop();
} catch (Throwable t) {
- LOG.debug("Error closing and stopping data writer", t);
+ log.debug("Error closing and stopping data writer", t);
}
}
}
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
index 59802decdb05..aa8165140215 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java
@@ -23,20 +23,19 @@ import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.FileIdPrefixProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider {
public static final String KAFKA_CONNECT_PARTITION_ID =
"hudi.kafka.connect.partition";
- private static final Logger LOG =
LoggerFactory.getLogger(KafkaConnectFileIdPrefixProvider.class);
private final String kafkaPartition;
public KafkaConnectFileIdPrefixProvider(TypedProperties props) {
super(props);
if (!props.containsKey(KAFKA_CONNECT_PARTITION_ID)) {
- LOG.error("Fatal error due to Kafka Connect Partition Id is not set");
+ log.error("Fatal error due to Kafka Connect Partition Id is not set");
throw new HoodieException("Kafka Connect Partition Key " +
KAFKA_CONNECT_PARTITION_ID + " not provided");
}
this.kafkaPartition = props.getProperty(KAFKA_CONNECT_PARTITION_ID);
@@ -48,8 +47,7 @@ public class KafkaConnectFileIdPrefixProvider extends
FileIdPrefixProvider {
// to generate a fixed sized hash.
String rawFileIdPrefix = kafkaPartition + partitionPath;
String hashedPrefix = KafkaConnectUtils.hashDigest(rawFileIdPrefix);
- LOG.info("CreateFileId for Kafka Partition " + kafkaPartition + " : " +
partitionPath + " = " + rawFileIdPrefix
- + " === " + hashedPrefix);
+ log.info("CreateFileId for Kafka Partition {} : {} = {} === {}",
kafkaPartition, partitionPath, rawFileIdPrefix, hashedPrefix);
return hashedPrefix;
}
}
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java
index 041fc03c5962..074a2a39938d 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaConnectControlAgent.java
@@ -22,6 +22,7 @@ import org.apache.hudi.connect.ControlMessage;
import org.apache.hudi.connect.transaction.TransactionCoordinator;
import org.apache.hudi.connect.transaction.TransactionParticipant;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -29,8 +30,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
@@ -50,9 +49,8 @@ import java.util.concurrent.TimeUnit;
* Use a single instance per worker (single-threaded),
* and register multiple tasks that can receive the control messages.
*/
+@Slf4j
public class KafkaConnectControlAgent implements KafkaControlAgent {
-
- private static final Logger LOG =
LoggerFactory.getLogger(KafkaConnectControlAgent.class);
private static final Object LOCK = new Object();
private static final long KAFKA_POLL_TIMEOUT_MS = 100;
private static final int EXEC_SHUTDOWN_TIMEOUT_MS = 5000;
@@ -143,7 +141,7 @@ public class KafkaConnectControlAgent implements
KafkaControlAgent {
records = consumer.poll(Duration.ofMillis(KAFKA_POLL_TIMEOUT_MS));
for (ConsumerRecord<String, byte[]> record : records) {
try {
- LOG.debug("Kafka consumerGroupId = {} topic = {}, partition = {},
offset = {}, customer = {}, country = {}",
+ log.debug("Kafka consumerGroupId = {} topic = {}, partition = {},
offset = {}, customer = {}, country = {}",
"", record.topic(), record.partition(), record.offset(),
record.key(), record.value());
ControlMessage message = ControlMessage.parseFrom(record.value());
String senderTopic = message.getTopicName();
@@ -154,23 +152,23 @@ public class KafkaConnectControlAgent implements
KafkaControlAgent {
partitionWorker.processControlEvent(message);
}
} else {
- LOG.warn("Failed to send message for unregistered participants
for topic {}", senderTopic);
+ log.warn("Failed to send message for unregistered participants
for topic {}", senderTopic);
}
} else if
(message.getReceiverType().equals(ControlMessage.EntityType.COORDINATOR)) {
if (topicCoordinators.containsKey(senderTopic)) {
topicCoordinators.get(senderTopic).processControlEvent(message);
}
} else {
- LOG.warn("Sender type of Control Message unknown {}",
message.getSenderType().name());
+ log.warn("Sender type of Control Message unknown {}",
message.getSenderType().name());
}
} catch (Exception e) {
- LOG.error("Fatal error while consuming a kafka record for topic =
{} partition = {}", record.topic(), record.partition(), e);
+ log.error("Fatal error while consuming a kafka record for topic =
{} partition = {}", record.topic(), record.partition(), e);
}
}
try {
consumer.commitSync();
} catch (CommitFailedException exception) {
- LOG.error("Fatal error while committing kafka control topic");
+ log.error("Fatal error while committing kafka control topic");
}
}
});
@@ -182,16 +180,16 @@ public class KafkaConnectControlAgent implements
KafkaControlAgent {
if (executorService != null) {
boolean terminated = false;
try {
- LOG.info("Shutting down executor service.");
+ log.info("Shutting down executor service.");
executorService.shutdown();
- LOG.info("Awaiting termination.");
+ log.info("Awaiting termination.");
terminated =
executorService.awaitTermination(EXEC_SHUTDOWN_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignored
}
if (!terminated) {
- LOG.warn(
+ log.warn(
"Unclean Kafka Control Manager executor service shutdown ");
executorService.shutdownNow();
}
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java
index 2161d88485d2..5dde8795f362 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/kafka/KafkaControlProducer.java
@@ -20,14 +20,13 @@ package org.apache.hudi.connect.kafka;
import org.apache.hudi.connect.ControlMessage;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Properties;
@@ -36,10 +35,9 @@ import java.util.Properties;
* Control Topic that coordinates transactions
* across Participants.
*/
+@Slf4j
public class KafkaControlProducer {
- private static final Logger LOG =
LoggerFactory.getLogger(KafkaControlProducer.class);
-
private final String bootstrapServers;
private final String controlTopicName;
private Producer<String, byte[]> producer;
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
index e7762b942f37..b3bd854a1e7e 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java
@@ -28,9 +28,9 @@ import org.apache.hudi.connect.writers.KafkaConnectConfigs;
import org.apache.hudi.connect.writers.KafkaConnectTransactionServices;
import org.apache.hudi.exception.HoodieException;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -52,11 +52,10 @@ import java.util.stream.Collectors;
* coordinates the Hudi write transactions
* across all the Kafka partitions for a single Kafka Topic.
*/
+@Slf4j
public class ConnectTransactionCoordinator implements TransactionCoordinator,
Runnable {
public static final int COORDINATOR_KAFKA_PARTITION = 0;
-
- private static final Logger LOG =
LoggerFactory.getLogger(ConnectTransactionCoordinator.class);
private static final String BOOTSTRAP_SERVERS_CFG = "bootstrap.servers";
private static final String KAFKA_OFFSET_KEY = "kafka.commit.offsets";
private static final String KAFKA_OFFSET_DELIMITER = ",";
@@ -66,6 +65,7 @@ public class ConnectTransactionCoordinator implements
TransactionCoordinator, Ru
private static final int COORDINATOR_EVENT_LOOP_TIMEOUT_MS = 1000;
private final KafkaConnectConfigs configs;
+ @Getter
private final TopicPartition partition;
private final KafkaControlAgent kafkaControlClient;
private final ConnectTransactionServices transactionServices;
@@ -120,8 +120,7 @@ public class ConnectTransactionCoordinator implements
TransactionCoordinator, Ru
executorService.submit(this);
}
kafkaControlClient.registerTransactionCoordinator(this);
- LOG.info(String.format("Start Transaction Coordinator for topic %s
partition %s",
- partition.topic(), partition.partition()));
+ log.info("Start Transaction Coordinator for topic {} partition {}",
partition.topic(), partition.partition());
initializeGlobalCommittedKafkaOffsets();
// Submit the first start commit
@@ -139,34 +138,29 @@ public class ConnectTransactionCoordinator implements
TransactionCoordinator, Ru
if (executorService != null) {
boolean terminated = false;
try {
- LOG.info("Shutting down executor service.");
+ log.info("Shutting down executor service.");
executorService.shutdown();
- LOG.info("Awaiting termination.");
+ log.info("Awaiting termination.");
terminated = executorService.awaitTermination(100,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignored
}
if (!terminated) {
- LOG.warn(
+ log.warn(
"Unclean Kafka Control Manager executor service shutdown ");
executorService.shutdownNow();
}
}
}
- @Override
- public TopicPartition getPartition() {
- return partition;
- }
-
@Override
public void processControlEvent(ControlMessage message) {
CoordinatorEvent.CoordinatorEventType type;
if (message.getType().equals(ControlMessage.EventType.WRITE_STATUS)) {
type = CoordinatorEvent.CoordinatorEventType.WRITE_STATUS;
} else {
- LOG.warn("Illegal message type [{}] processee by coordinator",
message.getType().name());
+ log.warn("Illegal message type [{}] processee by coordinator",
message.getType().name());
return;
}
@@ -186,7 +180,7 @@ public class ConnectTransactionCoordinator implements
TransactionCoordinator, Ru
processCoordinatorEvent(event);
}
} catch (InterruptedException exception) {
- LOG.warn("Error received while polling the event loop in Partition
Coordinator", exception);
+ log.warn("Error received while polling the event loop in Partition
Coordinator", exception);
}
}
}
@@ -223,7 +217,7 @@ public class ConnectTransactionCoordinator implements
TransactionCoordinator, Ru
&& currentState.equals(State.ENDED_COMMIT)) {
onReceiveWriteStatus(event.getMessage());
} else {
- LOG.warn("Could not process WRITE_STATUS due to missing message");
+ log.warn("Could not process WRITE_STATUS due to missing message");
}
break;
case ACK_COMMIT:
@@ -236,7 +230,7 @@ public class ConnectTransactionCoordinator implements
TransactionCoordinator, Ru
throw new IllegalStateException("Partition Coordinator has received
an illegal event type " + event.getEventType().name());
}
} catch (Exception exception) {
- LOG.warn("Error received while polling the event loop in Partition
Coordinator", exception);
+ log.warn("Error received while polling the event loop in Partition
Coordinator", exception);
}
}
@@ -253,7 +247,7 @@ public class ConnectTransactionCoordinator implements
TransactionCoordinator, Ru
currentCommitTime),
configs.getCommitIntervalSecs(), TimeUnit.SECONDS);
} catch (Exception exception) {
- LOG.error(String.format("Failed to start a new commit %s, will retry",
currentCommitTime), exception);
+ log.error("Failed to start a new commit {}, will retry",
currentCommitTime, exception);
submitEvent(new
CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
partition.topic(),
StringUtils.EMPTY_STRING),
@@ -265,7 +259,7 @@ public class ConnectTransactionCoordinator implements
TransactionCoordinator, Ru
try {
kafkaControlClient.publishMessage(buildControlMessage(ControlMessage.EventType.END_COMMIT));
} catch (Exception exception) {
- LOG.warn("Could not send END_COMMIT message for partition {} and
commitTime {}",
+ log.warn("Could not send END_COMMIT message for partition {} and
commitTime {}",
partition, currentCommitTime, exception);
}
currentConsumedKafkaOffsets.clear();
@@ -300,7 +294,7 @@ public class ConnectTransactionCoordinator implements
TransactionCoordinator, Ru
transformKafkaOffsets(currentConsumedKafkaOffsets));
if (success) {
- LOG.info("Commit " + currentCommitTime + " successful!");
+ log.info("Commit {} successful!", currentCommitTime);
currentState = State.WRITE_STATUS_RCVD;
globalCommittedKafkaOffsets.putAll(currentConsumedKafkaOffsets);
submitEvent(new
CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.ACK_COMMIT,
@@ -308,29 +302,29 @@ public class ConnectTransactionCoordinator implements
TransactionCoordinator, Ru
currentCommitTime));
return;
} else {
- LOG.error("Commit " + currentCommitTime + " failed!");
+ log.error("Commit {} failed!", currentCommitTime);
}
} else if (hasErrors) {
- LOG.error("Coordinator found errors when writing. Errors/Total=" +
totalErrorRecords + "/" + totalRecords);
- LOG.error("Printing out the top 100 errors");
+ log.error("Coordinator found errors when writing.
Errors/Total={}/{}", totalErrorRecords, totalRecords);
+ log.error("Printing out the top 100 errors");
allWriteStatuses.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws
-> {
- LOG.error("Global error :", ws.getGlobalError());
+ log.error("Global error :", ws.getGlobalError());
if (ws.getErrors().size() > 0) {
- ws.getErrors().forEach((key, value) -> LOG.trace("Error for
key:" + key + " is " + value));
+ ws.getErrors().forEach((key, value) -> log.trace("Error for
key:{} is {}", key, value));
}
});
}
// Submit the next start commit, that will rollback the current commit.
currentState = State.FAILED_COMMIT;
- LOG.warn("Current commit {} failed. Starting a new commit after
recovery delay of {} {}",
+ log.warn("Current commit {} failed. Starting a new commit after
recovery delay of {} {}",
currentCommitTime, RESTART_COMMIT_DELAY_MS,
TimeUnit.MILLISECONDS.name());
submitEvent(new
CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
partition.topic(),
StringUtils.EMPTY_STRING),
RESTART_COMMIT_DELAY_MS, TimeUnit.MILLISECONDS);
} catch (Exception exception) {
- LOG.error("Fatal error while committing file", exception);
+ log.error("Fatal error while committing file", exception);
}
}
}
@@ -339,7 +333,7 @@ public class ConnectTransactionCoordinator implements
TransactionCoordinator, Ru
// If we are still stuck in ENDED_STATE
if (currentState.equals(State.ENDED_COMMIT)) {
currentState = State.WRITE_STATUS_TIMEDOUT;
- LOG.warn("Current commit {} failed after a write status timeout.
Starting a new commit after recovery delay of {} {}",
+ log.warn("Current commit {} failed after a write status timeout.
Starting a new commit after recovery delay of {} {}",
currentCommitTime, RESTART_COMMIT_DELAY_MS,
TimeUnit.MILLISECONDS.name());
// Submit the next start commit
submitEvent(new
CoordinatorEvent(CoordinatorEvent.CoordinatorEventType.START_COMMIT,
@@ -353,7 +347,7 @@ public class ConnectTransactionCoordinator implements
TransactionCoordinator, Ru
try {
kafkaControlClient.publishMessage(buildControlMessage(ControlMessage.EventType.ACK_COMMIT));
} catch (Exception exception) {
- LOG.warn("Could not send ACK_COMMIT message for partition {} and
commitTime {}", partition, currentCommitTime, exception);
+ log.warn("Could not send ACK_COMMIT message for partition {} and
commitTime {}", partition, currentCommitTime, exception);
}
currentState = State.ACKED_COMMIT;
@@ -369,11 +363,11 @@ public class ConnectTransactionCoordinator implements
TransactionCoordinator, Ru
Map<String, String> commitMetadata =
transactionServices.fetchLatestExtraCommitMetadata();
String latestKafkaOffsets = commitMetadata.get(KAFKA_OFFSET_KEY);
if (!StringUtils.isNullOrEmpty(latestKafkaOffsets)) {
- LOG.info("Retrieved Raw Kafka offsets from Hudi Commit File " +
latestKafkaOffsets);
+ log.info("Retrieved Raw Kafka offsets from Hudi Commit File {}",
latestKafkaOffsets);
globalCommittedKafkaOffsets =
Arrays.stream(latestKafkaOffsets.split(KAFKA_OFFSET_DELIMITER))
.map(entry -> entry.split(KAFKA_OFFSET_KV_DELIMITER))
.collect(Collectors.toMap(entry -> Integer.parseInt(entry[0]),
entry -> Long.parseLong(entry[1])));
- LOG.info("Initialized the kafka offset commits " +
globalCommittedKafkaOffsets);
+ log.info("Initialized the kafka offset commits {}",
globalCommittedKafkaOffsets);
}
} catch (Exception exception) {
throw new HoodieException("Could not deserialize the kafka commit
offsets", exception);
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
index dda176c2ba9a..747a29f9606e 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java
@@ -28,11 +28,11 @@ import
org.apache.hudi.connect.writers.KafkaConnectWriterProvider;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.LinkedList;
@@ -44,12 +44,12 @@ import java.util.concurrent.LinkedBlockingQueue;
* Implementation of the {@link TransactionParticipant} that coordinates the
Hudi write transactions
* based on events from the {@link TransactionCoordinator} and manages the
Hudi Writes for a specific Kafka Partition.
*/
+@Slf4j
public class ConnectTransactionParticipant implements TransactionParticipant {
- private static final Logger LOG =
LoggerFactory.getLogger(ConnectTransactionParticipant.class);
-
private final LinkedList<SinkRecord> buffer;
private final BlockingQueue<ControlMessage> controlEvents;
+ @Getter
private final TopicPartition partition;
private final SinkTaskContext context;
private final KafkaControlAgent kafkaControlAgent;
@@ -81,7 +81,7 @@ public class ConnectTransactionParticipant implements
TransactionParticipant {
@Override
public void start() {
- LOG.info("Start Hudi Transaction Participant for partition " +
partition.partition());
+ log.info("Start Hudi Transaction Participant for partition {}",
partition.partition());
this.kafkaControlAgent.registerTransactionParticipant(this);
context.pause(partition);
}
@@ -107,11 +107,6 @@ public class ConnectTransactionParticipant implements
TransactionParticipant {
return committedKafkaOffset;
}
- @Override
- public TopicPartition getPartition() {
- return partition;
- }
-
@Override
public void processRecords() {
while (!controlEvents.isEmpty()) {
@@ -146,21 +141,21 @@ public class ConnectTransactionParticipant implements
TransactionParticipant {
syncKafkaOffsetWithLeader(message);
context.resume(partition);
String currentCommitTime = message.getCommitTime();
- LOG.info("Started a new transaction after receiving START_COMMIT for
commit " + currentCommitTime);
+ log.info("Started a new transaction after receiving START_COMMIT for
commit {}", currentCommitTime);
try {
ongoingTransactionInfo = new TransactionInfo<>(currentCommitTime,
writerProvider.getWriter(currentCommitTime));
ongoingTransactionInfo.setExpectedKafkaOffset(committedKafkaOffset);
} catch (Exception exception) {
- LOG.warn("Failed to start a new transaction", exception);
+ log.warn("Failed to start a new transaction", exception);
}
}
private void handleEndCommit(ControlMessage message) {
if (ongoingTransactionInfo == null) {
- LOG.warn("END_COMMIT {} is received while we were NOT in active
transaction", message.getCommitTime());
+ log.warn("END_COMMIT {} is received while we were NOT in active
transaction", message.getCommitTime());
return;
} else if
(!ongoingTransactionInfo.getCommitTime().equals(message.getCommitTime())) {
- LOG.error("Fatal error received END_COMMIT with commit time {} while
local transaction commit time {}",
+ log.error("Fatal error received END_COMMIT with commit time {} while
local transaction commit time {}",
message.getCommitTime(), ongoingTransactionInfo.getCommitTime());
// Recovery: A new END_COMMIT from leader caused interruption to an
existing transaction,
// explicitly reset Kafka commit offset to ensure no data loss
@@ -194,7 +189,7 @@ public class ConnectTransactionParticipant implements
TransactionParticipant {
kafkaControlAgent.publishMessage(writeStatusEvent);
} catch (Exception exception) {
- LOG.error(String.format("Error writing records and ending commit %s for
partition %s", message.getCommitTime(), partition.partition()), exception);
+ log.error("Error writing records and ending commit {} for partition {}",
message.getCommitTime(), partition.partition(), exception);
throw new HoodieIOException(String.format("Error writing records and
ending commit %s for partition %s", message.getCommitTime(),
partition.partition()),
new IOException(exception));
}
@@ -219,16 +214,16 @@ public class ConnectTransactionParticipant implements
TransactionParticipant {
ongoingTransactionInfo.getWriter().writeRecord(record);
ongoingTransactionInfo.setExpectedKafkaOffset(record.kafkaOffset()
+ 1);
} else if (record != null && record.kafkaOffset() >
ongoingTransactionInfo.getExpectedKafkaOffset()) {
- LOG.warn("Received a kafka record with offset {} above the next
expected kafka offset {} for partition {}. "
+ log.warn("Received a kafka record with offset {} above the next
expected kafka offset {} for partition {}. "
+ "Resetting the kafka offset to {}", record.kafkaOffset(),
ongoingTransactionInfo.getExpectedKafkaOffset(), partition,
ongoingTransactionInfo.getExpectedKafkaOffset());
context.offset(partition,
ongoingTransactionInfo.getExpectedKafkaOffset());
} else if (record != null && record.kafkaOffset() <
ongoingTransactionInfo.getExpectedKafkaOffset()) {
- LOG.info("Received a kafka record with offset {} below the next
expected kafka offset {} for partition {}. "
+ log.info("Received a kafka record with offset {} below the next
expected kafka offset {} for partition {}. "
+ "No action will be taken but this record will be ignored
since its already written", record.kafkaOffset(),
ongoingTransactionInfo.getExpectedKafkaOffset(), partition);
}
buffer.poll();
} catch (Exception exception) {
- LOG.warn("Failed to write records for transaction [{}] in partition
[{}]",
+ log.warn("Failed to write records for transaction [{}] in partition
[{}]",
ongoingTransactionInfo.getCommitTime(), partition.partition(),
exception);
}
}
@@ -241,7 +236,7 @@ public class ConnectTransactionParticipant implements
TransactionParticipant {
ongoingTransactionInfo.getWriter().close();
ongoingTransactionInfo = null;
} catch (HoodieIOException exception) {
- LOG.warn("Failed to cleanup existing transaction", exception);
+ log.warn("Failed to cleanup existing transaction", exception);
}
}
}
@@ -254,14 +249,14 @@ public class ConnectTransactionParticipant implements
TransactionParticipant {
if (coordinatorCommittedKafkaOffset != null &&
coordinatorCommittedKafkaOffset >= 0) {
// Debug only messages
if (coordinatorCommittedKafkaOffset != committedKafkaOffset) {
- LOG.warn("The coordinator offset for kafka partition {} is {} while
the locally committed offset is {}. "
+ log.warn("The coordinator offset for kafka partition {} is {} while
the locally committed offset is {}. "
+ "Resetting the local committed offset to the coordinator
provided one to ensure consistency", partition,
coordinatorCommittedKafkaOffset, committedKafkaOffset);
}
committedKafkaOffset = coordinatorCommittedKafkaOffset;
return;
}
} else {
- LOG.warn("The coordinator offset for kafka partition {} is not present
while the locally committed offset is {}. "
+ log.warn("The coordinator offset for kafka partition {} is not present
while the locally committed offset is {}. "
+ "Resetting the local committed offset to 0 to avoid data loss",
partition, committedKafkaOffset);
}
// If the coordinator does not have a committed offset for this partition,
reset to zero offset.
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java
index f9f467a83bec..a998afb191c8 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/CoordinatorEvent.java
@@ -20,16 +20,21 @@ package org.apache.hudi.connect.transaction;
import org.apache.hudi.connect.ControlMessage;
+import lombok.Getter;
+import lombok.Setter;
+
/**
* The events within the Coordinator that trigger
* the state changes in the state machine of
* the Coordinator.
*/
+@Getter
public class CoordinatorEvent {
private final CoordinatorEventType eventType;
private final String topicName;
private final String commitTime;
+ @Setter
private ControlMessage message;
public CoordinatorEvent(CoordinatorEventType eventType,
@@ -40,26 +45,6 @@ public class CoordinatorEvent {
this.commitTime = commitTime;
}
- public CoordinatorEventType getEventType() {
- return eventType;
- }
-
- public String getTopicName() {
- return topicName;
- }
-
- public String getCommitTime() {
- return commitTime;
- }
-
- public ControlMessage getMessage() {
- return message;
- }
-
- public void setMessage(ControlMessage message) {
- this.message = message;
- }
-
/**
* The type of Coordinator Event.
*/
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java
index 7c1852e5fa5c..68e9da875b0f 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionInfo.java
@@ -20,15 +20,20 @@ package org.apache.hudi.connect.transaction;
import org.apache.hudi.connect.writers.ConnectWriter;
+import lombok.Getter;
+import lombok.Setter;
+
/**
* Stores all the state for the current Transaction within a
* {@link TransactionParticipant}.
* @param <T> The type of status returned by the underlying writer.
*/
+@Getter
public class TransactionInfo<T> {
private final String commitTime;
private final ConnectWriter<T> writer;
+ @Setter
private long expectedKafkaOffset;
private boolean commitInitiated;
@@ -39,26 +44,6 @@ public class TransactionInfo<T> {
this.commitInitiated = false;
}
- public String getCommitTime() {
- return commitTime;
- }
-
- public ConnectWriter<T> getWriter() {
- return writer;
- }
-
- public long getExpectedKafkaOffset() {
- return expectedKafkaOffset;
- }
-
- public boolean isCommitInitiated() {
- return commitInitiated;
- }
-
- public void setExpectedKafkaOffset(long expectedKafkaOffset) {
- this.expectedKafkaOffset = expectedKafkaOffset;
- }
-
public void commitInitiated() {
this.commitInitiated = true;
}
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
index 266a2d932b71..d77873065b9b 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
@@ -39,13 +39,12 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.storage.StorageConfiguration;
import com.google.protobuf.ByteString;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.FileVisitOption;
@@ -67,9 +66,8 @@ import static
org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
/**
* Helper methods for Kafka.
*/
+@Slf4j
public class KafkaConnectUtils {
-
- private static final Logger LOG =
LoggerFactory.getLogger(KafkaConnectUtils.class);
private static final String HOODIE_CONF_PREFIX = "hoodie.";
public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
public static final String HADOOP_HOME = "HADOOP_HOME";
@@ -82,10 +80,10 @@ public class KafkaConnectUtils {
String hadoopHomePath = System.getenv(HADOOP_HOME);
DEFAULT_HADOOP_CONF_FILES.addAll(getHadoopConfigFiles(hadoopConfigPath,
hadoopHomePath));
if (!DEFAULT_HADOOP_CONF_FILES.isEmpty()) {
- LOG.info(String.format("Found Hadoop default config files %s",
DEFAULT_HADOOP_CONF_FILES));
+ log.info("Found Hadoop default config files {}",
DEFAULT_HADOOP_CONF_FILES);
}
} catch (IOException e) {
- LOG.error("An error occurred while getting the default Hadoop
configuration. "
+ log.error("An error occurred while getting the default Hadoop
configuration. "
+ "Please use hadoop.conf.dir or hadoop.home to configure Hadoop
environment variables", e);
}
}
@@ -127,7 +125,7 @@ public class KafkaConnectUtils {
Map<String, KafkaFuture<TopicDescription>> values = result.values();
KafkaFuture<TopicDescription> topicDescription = values.get(topicName);
int numPartitions = topicDescription.get().partitions().size();
- LOG.info(String.format("Latest number of partitions for topic %s is %s",
topicName, numPartitions));
+ log.info("Latest number of partitions for topic {} is {}", topicName,
numPartitions);
return numPartitions;
} catch (Exception exception) {
throw new HoodieException("Fatal error fetching the latest partition of
kafka topic name" + topicName, exception);
@@ -221,7 +219,7 @@ public class KafkaConnectUtils {
try {
md = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
- LOG.error("Fatal error selecting hash algorithm", e);
+ log.error("Fatal error selecting hash algorithm", e);
throw new HoodieException(e);
}
byte[] digest =
Objects.requireNonNull(md).digest(getUTF8Bytes(stringToHash));
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
index ff53c9dc28b9..70fefe431603 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java
@@ -29,10 +29,9 @@ import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.connect.sink.SinkRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -43,12 +42,12 @@ import java.util.List;
* converting them to {@link HoodieRecord}s that can be written to Hudi by
* the derived implementations of this class.
*/
+@Slf4j
public abstract class AbstractConnectWriter implements
ConnectWriter<WriteStatus> {
public static final String KAFKA_AVRO_CONVERTER =
"io.confluent.connect.avro.AvroConverter";
public static final String KAFKA_JSON_CONVERTER =
"org.apache.kafka.connect.json.JsonConverter";
public static final String KAFKA_STRING_CONVERTER =
"org.apache.kafka.connect.storage.StringConverter";
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractConnectWriter.class);
protected final String instantTime;
private final KeyGenerator keyGenerator;
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
index da6fe5db7da1..5956956479dd 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java
@@ -35,9 +35,8 @@ import org.apache.hudi.io.IOUtils;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -48,10 +47,9 @@ import java.util.List;
* Specific implementation of a Hudi Writer that buffers all incoming records,
* and writes them to Hudi files on the end of a transaction using Bulk Insert.
*/
+@Slf4j
public class BufferedConnectWriter extends AbstractConnectWriter {
- private static final Logger LOG =
LoggerFactory.getLogger(BufferedConnectWriter.class);
-
private final HoodieEngineContext context;
private final HoodieJavaWriteClient writeClient;
private final HoodieWriteConfig config;
@@ -75,7 +73,7 @@ public class BufferedConnectWriter extends
AbstractConnectWriter {
try {
// Load and batch all incoming records in a map
long memoryForMerge =
IOUtils.getMaxMemoryPerPartitionMerge(context.getTaskContextSupplier(), config);
- LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
+ log.info("MaxMemoryPerPartitionMerge => {}", memoryForMerge);
this.bufferedRecords = new ExternalSpillableMap<>(memoryForMerge,
config.getSpillableMapBasePath(),
new DefaultSizeEstimator(),
@@ -97,12 +95,10 @@ public class BufferedConnectWriter extends
AbstractConnectWriter {
@Override
public List<WriteStatus> flushRecords() {
try {
- LOG.info("Number of entries in MemoryBasedMap => "
- + bufferedRecords.getInMemoryMapNumEntries()
- + ", Total size in bytes of MemoryBasedMap => "
- + bufferedRecords.getCurrentInMemoryMapSize() + ", Number of entries
in BitCaskDiskMap => "
- + bufferedRecords.getDiskBasedMapNumEntries() + ", Size of file
spilled to disk => "
- + bufferedRecords.getSizeOfFileOnDiskInBytes());
+ log.info("Number of entries in MemoryBasedMap => {}, Total size in bytes
of MemoryBasedMap => {}, "
+ + "Number of entries in BitCaskDiskMap => {}, Size of file
spilled to disk => {}",
+ bufferedRecords.getInMemoryMapNumEntries(),
bufferedRecords.getCurrentInMemoryMapSize(),
+ bufferedRecords.getDiskBasedMapNumEntries(),
bufferedRecords.getSizeOfFileOnDiskInBytes());
List<WriteStatus> writeStatuses = new ArrayList<>();
boolean isMorTable =
Option.ofNullable(connectConfigs.getString(HoodieTableConfig.TYPE))
@@ -122,7 +118,7 @@ public class BufferedConnectWriter extends
AbstractConnectWriter {
}
}
bufferedRecords.close();
- LOG.info("Flushed hudi records and got writeStatuses: " + writeStatuses);
+ log.info("Flushed hudi records and got writeStatuses: {}",
writeStatuses);
return writeStatuses;
} catch (Exception e) {
throw new HoodieIOException("Write records failed", new IOException(e));
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
index c7a4d6314c59..21c76f5aa8c6 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -40,10 +40,9 @@ import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
@@ -57,10 +56,9 @@ import java.util.Set;
* {@link TransactionCoordinator}
* using {@link HoodieJavaWriteClient}.
*/
+@Slf4j
public class KafkaConnectTransactionServices implements
ConnectTransactionServices {
- private static final Logger LOG =
LoggerFactory.getLogger(KafkaConnectTransactionServices.class);
-
private final KafkaConnectConfigs connectConfigs;
private final Option<HoodieTableMetaClient> tableMetaClient;
private final StorageConfiguration<Configuration> storageConf;
@@ -91,8 +89,7 @@ public class KafkaConnectTransactionServices implements
ConnectTransactionServic
String partitionColumns =
KafkaConnectUtils.getPartitionColumnsForKeyGenerator(keyGenerator,
connectConfigs.getProps());
- LOG.info(String.format("Setting record key %s and partition fields %s
for table %s",
- recordKeyFields, partitionColumns, tableBasePath + tableName));
+ log.info("Setting record key {} and partition fields {} for table {}",
recordKeyFields, partitionColumns, tableBasePath + tableName);
tableMetaClient = Option.of(HoodieTableMetaClient.newTableBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE.name())
@@ -116,7 +113,7 @@ public class KafkaConnectTransactionServices implements
ConnectTransactionServic
public String startCommit() {
String newCommitTime = javaClient.startCommit();
javaClient.transitionInflight(newCommitTime);
- LOG.info("Starting Hudi commit " + newCommitTime);
+ log.info("Starting Hudi commit {}", newCommitTime);
return newCommitTime;
}
@@ -124,16 +121,16 @@ public class KafkaConnectTransactionServices implements
ConnectTransactionServic
public boolean endCommit(String commitTime, List<WriteStatus> writeStatuses,
Map<String, String> extraMetadata) {
boolean success = javaClient.commit(commitTime, writeStatuses,
Option.of(extraMetadata));
if (success) {
- LOG.info("Ending Hudi commit " + commitTime);
+ log.info("Ending Hudi commit {}", commitTime);
// Schedule clustering and compaction as needed.
if (writeConfig.isAsyncClusteringEnabled()) {
javaClient.scheduleClustering(Option.empty()).ifPresent(
- instantTs -> LOG.info("Scheduled clustering at instant time:" +
instantTs));
+ instantTs -> log.info("Scheduled clustering at instant time:{}",
instantTs));
}
if (isAsyncCompactionEnabled()) {
javaClient.scheduleCompaction(Option.empty()).ifPresent(
- instantTs -> LOG.info("Scheduled compaction at instant time:" +
instantTs));
+ instantTs -> log.info("Scheduled compaction at instant time:{}",
instantTs));
}
syncMeta();
}
@@ -147,7 +144,7 @@ public class KafkaConnectTransactionServices implements
ConnectTransactionServic
if (metadata.isPresent()) {
return metadata.get().getExtraMetadata();
} else {
- LOG.info("Hoodie Extra Metadata from latest commit is absent");
+ log.info("Hoodie Extra Metadata from latest commit is absent");
return Collections.emptyMap();
}
}
diff --git
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
index acf8baacf3e9..92ed7de4484e 100644
---
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
+++
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
@@ -41,10 +41,9 @@ import
org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.schema.SchemaProvider;
import org.apache.hudi.storage.StorageConfiguration;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collections;
@@ -52,10 +51,9 @@ import java.util.Collections;
* Provides the Hudi Writer for the {@link
org.apache.hudi.connect.transaction.TransactionParticipant}
* to write the incoming records to Hudi.
*/
+@Slf4j
public class KafkaConnectWriterProvider implements
ConnectWriterProvider<WriteStatus> {
- private static final Logger LOG =
LoggerFactory.getLogger(KafkaConnectWriterProvider.class);
-
private final KafkaConnectConfigs connectConfigs;
private final HoodieEngineContext context;
private final HoodieWriteConfig writeConfig;
diff --git
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
index d939351a58f6..5ce1710bb785 100644
---
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
+++
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.helper.MockConnectTransactionServices;
import org.apache.hudi.helper.MockKafkaControlAgent;
+import lombok.Getter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.BeforeEach;
@@ -107,6 +108,7 @@ public class TestConnectTransactionCoordinator {
private static class MockParticipant implements TransactionParticipant {
private final MockKafkaControlAgent kafkaControlAgent;
+ @Getter
private final TopicPartition partition;
private final CountDownLatch latch;
private final TestScenarios testScenario;
@@ -148,11 +150,6 @@ public class TestConnectTransactionCoordinator {
public void processRecords() {
}
- @Override
- public TopicPartition getPartition() {
- return partition;
- }
-
@Override
public void processControlEvent(ControlMessage message) {
assertEquals(message.getSenderType(),
ControlMessage.EntityType.COORDINATOR);
diff --git
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
index a1c1c7ac849c..50c3c45bcfe2 100644
---
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
+++
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java
@@ -28,6 +28,7 @@ import org.apache.hudi.helper.MockKafkaConnect;
import org.apache.hudi.helper.MockKafkaControlAgent;
import org.apache.hudi.helper.TestHudiWriterProvider;
+import lombok.Getter;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
@@ -194,9 +195,12 @@ public class TestConnectTransactionParticipant {
}
private final KafkaControlAgent kafkaControlAgent;
+ @Getter
private final TopicPartition partition;
+ @Getter
private Option<ControlMessage> lastReceivedWriteStatusEvent;
+ @Getter
private long committedKafkaOffset;
public MockCoordinator(KafkaControlAgent kafkaControlAgent) {
@@ -229,14 +233,6 @@ public class TestConnectTransactionParticipant {
}
}
- public Option<ControlMessage> getLastReceivedWriteStatusEvent() {
- return lastReceivedWriteStatusEvent;
- }
-
- public long getCommittedKafkaOffset() {
- return committedKafkaOffset;
- }
-
@Override
public void start() {
kafkaControlAgent.registerTransactionCoordinator(this);
@@ -247,11 +243,6 @@ public class TestConnectTransactionParticipant {
kafkaControlAgent.deregisterTransactionCoordinator(this);
}
- @Override
- public TopicPartition getPartition() {
- return partition;
- }
-
@Override
public void processControlEvent(ControlMessage message) {
if (message.getType().equals(ControlMessage.EventType.WRITE_STATUS)) {
diff --git
a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaConnect.java
b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaConnect.java
index 66ee2b597cf7..ea0209c419f9 100644
---
a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaConnect.java
+++
b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/MockKafkaConnect.java
@@ -20,6 +20,8 @@ package org.apache.hudi.helper;
import org.apache.hudi.connect.transaction.TransactionParticipant;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -44,8 +46,11 @@ public class MockKafkaConnect implements SinkTaskContext {
private final TopicPartition testPartition;
+ @Setter
private TransactionParticipant participant;
+ @Getter
private long currentKafkaOffset;
+ @Getter
private boolean isPaused;
private boolean isResetOffset;
@@ -56,22 +61,10 @@ public class MockKafkaConnect implements SinkTaskContext {
isResetOffset = false;
}
- public void setParticipant(TransactionParticipant participant) {
- this.participant = participant;
- }
-
- public boolean isPaused() {
- return isPaused;
- }
-
public boolean isResumed() {
return !isPaused;
}
- public long getCurrentKafkaOffset() {
- return currentKafkaOffset;
- }
-
@Override
public void pause(TopicPartition... partitions) {
if (Arrays.stream(partitions).allMatch(testPartition::equals)) {
diff --git
a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestHudiWriterProvider.java
b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestHudiWriterProvider.java
index 45c9b03725f5..87fd5ab36e10 100644
---
a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestHudiWriterProvider.java
+++
b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestHudiWriterProvider.java
@@ -22,6 +22,8 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.connect.writers.ConnectWriter;
import org.apache.hudi.connect.writers.ConnectWriterProvider;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
import org.apache.kafka.connect.sink.SinkRecord;
import java.util.List;
@@ -30,13 +32,11 @@ import java.util.List;
* Helper class the provides a Hudi writer and
* maintains stats that are used for test validation.
*/
+@NoArgsConstructor
public class TestHudiWriterProvider implements
ConnectWriterProvider<WriteStatus> {
private TestHudiWriter currentWriter;
- public TestHudiWriterProvider() {
- }
-
public int getLatestNumberWrites() {
return (currentWriter != null) ? currentWriter.numberRecords : 0;
}
@@ -51,6 +51,7 @@ public class TestHudiWriterProvider implements
ConnectWriterProvider<WriteStatus
return currentWriter;
}
+ @Getter
private static class TestHudiWriter implements ConnectWriter<WriteStatus> {
private int numberRecords;
@@ -61,14 +62,6 @@ public class TestHudiWriterProvider implements
ConnectWriterProvider<WriteStatus
this.isClosed = false;
}
- public int getNumberRecords() {
- return numberRecords;
- }
-
- public boolean isClosed() {
- return isClosed;
- }
-
@Override
public void writeRecord(SinkRecord record) {
numberRecords++;
diff --git
a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
index ce279c89df6a..f9bd2cccbb1d 100644
---
a/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
+++
b/hudi-kafka-connect/src/test/java/org/apache/hudi/writers/TestAbstractConnectWriter.java
@@ -34,6 +34,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.schema.SchemaProvider;
+import lombok.Getter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -147,6 +148,7 @@ public class TestAbstractConnectWriter {
private static class AbstractHudiConnectWriterTestWrapper extends
AbstractConnectWriter {
+ @Getter
private List<HoodieRecord> writtenRecords;
public AbstractHudiConnectWriterTestWrapper(KafkaConnectConfigs
connectConfigs, KeyGenerator keyGenerator, SchemaProvider schemaProvider) {
@@ -154,10 +156,6 @@ public class TestAbstractConnectWriter {
writtenRecords = new ArrayList<>();
}
- public List<HoodieRecord> getWrittenRecords() {
- return writtenRecords;
- }
-
@Override
protected void writeHudiRecord(HoodieRecord<?> record) {
writtenRecords.add(record);