This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new dc339df94ab [fix][io] KCA: handle kafka sources that use commitRecord 
(#20121)
dc339df94ab is described below

commit dc339df94abf89e8f8513c37adeb261931489813
Author: Andrey Yegorov <[email protected]>
AuthorDate: Tue Apr 18 14:37:41 2023 -0700

    [fix][io] KCA: handle kafka sources that use commitRecord (#20121)
    
    (cherry picked from commit 46a65fdc182a34753ebabfa35f63c0f6c765462f)
---
 .../io/kafka/connect/KafkaConnectSource.java       |  43 +++++++-
 .../kafka/connect/ErrRecFileStreamSourceTask.java  |  33 ++++++
 .../connect/KafkaConnectSourceErrRecTest.java      | 118 +++++++++++++++++++++
 3 files changed, 193 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index f5f6efd08bd..f2ee8a8e6ca 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -27,6 +27,8 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.pulsar.client.api.Schema;
@@ -57,6 +59,7 @@ public class KafkaConnectSource extends 
AbstractKafkaConnectSource<KeyValue<byte
             config.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
         }
         log.info("jsonWithEnvelope: {}", jsonWithEnvelope);
+
         super.open(config, sourceContext);
     }
 
@@ -69,17 +72,26 @@ public class KafkaConnectSource extends 
AbstractKafkaConnectSource<KeyValue<byte
 
     private static final AvroData avroData = new AvroData(1000);
 
-    private class KafkaSourceRecord extends 
AbstractKafkaSourceRecord<KeyValue<byte[], byte[]>>
+    public class KafkaSourceRecord extends 
AbstractKafkaSourceRecord<KeyValue<byte[], byte[]>>
             implements KVRecord<byte[], byte[]> {
 
+        final int keySize;
+        final int valueSize;
+
+        final SourceRecord srcRecord;
+
         KafkaSourceRecord(SourceRecord srcRecord) {
             super(srcRecord);
+            this.srcRecord = srcRecord;
+
             byte[] keyBytes = keyConverter.fromConnectData(
                     srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
+            keySize = keyBytes != null ? keyBytes.length : 0;
             this.key = keyBytes != null ? 
Optional.of(Base64.getEncoder().encodeToString(keyBytes)) : Optional.empty();
 
             byte[] valueBytes = valueConverter.fromConnectData(
                     srcRecord.topic(), srcRecord.valueSchema(), 
srcRecord.value());
+            valueSize = valueBytes != null ? valueBytes.length : 0;
 
             this.value = new KeyValue<>(keyBytes, valueBytes);
 
@@ -145,6 +157,35 @@ public class KafkaConnectSource extends 
AbstractKafkaConnectSource<KeyValue<byte
             }
         }
 
+        @Override
+        public void ack() {
+            // first try to commitRecord() for the current record in the batch
+            // then call super.ack() which calls commit() after complete batch 
of records is processed
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("commitRecord() for record: {}", srcRecord);
+                }
+                getSourceTask().commitRecord(srcRecord,
+                        new RecordMetadata(
+                                new TopicPartition(srcRecord.topic() == null
+                                            ? topicName.orElse("UNDEFINED")
+                                            : srcRecord.topic(),
+                                        srcRecord.kafkaPartition() == null ? 0 
: srcRecord.kafkaPartition()),
+                                -1L, // baseOffset == -1L means no offset
+                                0, // batchIndex, doesn't matter if baseOffset 
== -1L
+                                null == srcRecord.timestamp() ? -1L : 
srcRecord.timestamp(),
+                                keySize, // serializedKeySize
+                                valueSize // serializedValueSize
+                        ));
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.error("Source task failed to commit record, "
+                        + "source task should resend data, will get 
duplicate", e);
+                return;
+            }
+            super.ack();
+        }
+
     }
 
 }
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/ErrRecFileStreamSourceTask.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/ErrRecFileStreamSourceTask.java
new file mode 100644
index 00000000000..cbdd4c41bf6
--- /dev/null
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/ErrRecFileStreamSourceTask.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.connect.file.FileStreamSourceTask;
+import org.apache.kafka.connect.source.SourceRecord;
+
+public class ErrRecFileStreamSourceTask extends FileStreamSourceTask {
+
+    @Override
+    public void commitRecord(SourceRecord record, RecordMetadata metadata) 
throws InterruptedException {
+        throw new org.apache.kafka.connect.errors.ConnectException("blah");
+    }
+
+}
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceErrRecTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceErrRecTest.java
new file mode 100644
index 00000000000..9872e1fbc7e
--- /dev/null
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceErrRecTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka.connect;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.connect.file.FileStreamSourceConnector;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SourceContext;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.io.File;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Test the implementation of {@link KafkaConnectSource}.
+ */
+@Slf4j
+public class KafkaConnectSourceErrRecTest extends ProducerConsumerBase  {
+
+    private Map<String, Object> config = new HashMap<>();
+    private String offsetTopicName;
+    // The topic to publish data to, for kafkaSource
+    private String topicName;
+    private KafkaConnectSource kafkaConnectSource;
+    private File tempFile;
+    private SourceContext context;
+    private PulsarClient client;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        config.put(TaskConfig.TASK_CLASS_CONFIG, 
"org.apache.pulsar.io.kafka.connect.ErrRecFileStreamSourceTask");
+        config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
+        config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
+
+        this.offsetTopicName = 
"persistent://my-property/my-ns/kafka-connect-source-offset";
+        config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, 
offsetTopicName);
+
+        this.topicName = "persistent://my-property/my-ns/kafka-connect-source";
+        config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName);
+        tempFile = File.createTempFile("some-file-name", null);
+        config.put(FileStreamSourceConnector.FILE_CONFIG, 
tempFile.getAbsoluteFile().toString());
+        config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, 
String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
+
+        this.context = mock(SourceContext.class);
+        this.client = PulsarClient.builder()
+                .serviceUrl(brokerUrl.toString())
+                .build();
+        when(context.getPulsarClient()).thenReturn(this.client);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        if (this.client != null) {
+            this.client.close();
+        }
+        tempFile.delete();
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testCommitRecordCalled() throws Exception {
+        kafkaConnectSource = new KafkaConnectSource();
+        kafkaConnectSource.open(config, context);
+
+        // use FileStreamSourceConnector, each line is a record, need "\n" and 
end of each record.
+        OutputStream os = Files.newOutputStream(tempFile.toPath());
+
+        String line1 = "This is the first line\n";
+        os.write(line1.getBytes());
+        os.flush();
+        os.close();
+
+        Record<KeyValue<byte[], byte[]>> record = kafkaConnectSource.read();
+
+        assertTrue(record instanceof KafkaConnectSource.KafkaSourceRecord);
+
+        try {
+            record.ack();
+            fail("expected exception");
+        } catch (Exception e) {
+            log.info("got exception", e);
+            assertTrue(e instanceof 
org.apache.kafka.connect.errors.ConnectException);
+        }
+    }
+}

Reply via email to