merlimat closed pull request #2184: Moved Record interface as part of functions
api
URL: https://github.com/apache/incubator-pulsar/pull/2184
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index d07c85ea7c..92219782c4 100644
---
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -56,7 +56,7 @@
@Slf4j
@PrepareForTest({CmdFunctions.class})
-@PowerMockIgnore({ "javax.management.*", "javax.ws.*",
"org.apache.logging.log4j.*", "org.apache.pulsar.io.core.*" })
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*",
"org.apache.logging.log4j.*", "org.apache.pulsar.io.core.*",
"org.apache.pulsar.functions.api.*" })
public class TestCmdSinks {
@ObjectFactory
@@ -155,7 +155,7 @@ public void testCliCorrect() throws Exception {
sinkConfig
);
}
-
+
@Test
public void testMissingTenant() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
@@ -816,8 +816,8 @@ public void testCmdSinkConfigFile(SinkConfig
testSinkConfig, SinkConfig expected
verify(updateSink).validateSinkConfigs(eq(expectedSinkConfig));
verify(localSinkRunner).validateSinkConfigs(eq(expectedSinkConfig));
}
-
-
+
+
@Test
public void testCliOverwriteConfigFile() throws Exception {
@@ -837,7 +837,7 @@ public void testCliOverwriteConfigFile() throws Exception {
testSinkConfig.setArchive(JAR_FILE_PATH + "-prime");
testSinkConfig.setResources(new Resources(CPU + 1, RAM + 1, DISK + 1));
testSinkConfig.setConfigs(createSink.parseConfigs("{\"created_at-prime\":\"Mon
Jul 02 00:33:15 +0000 2018\"}"));
-
+
SinkConfig expectedSinkConfig = getSinkConfig();
@@ -846,7 +846,7 @@ public void testCliOverwriteConfigFile() throws Exception {
new YAMLMapper().writeValue(file, testSinkConfig);
Assert.assertEquals(testSinkConfig,
CmdUtils.loadConfig(file.getAbsolutePath(), SinkConfig.class));
-
+
testMixCliAndConfigFile(
TENANT,
NAMESPACE,
@@ -866,7 +866,7 @@ public void testCliOverwriteConfigFile() throws Exception {
expectedSinkConfig
);
}
-
+
public void testMixCliAndConfigFile(
String tenant,
String namespace,
@@ -885,8 +885,8 @@ public void testMixCliAndConfigFile(
String sinkConfigFile,
SinkConfig sinkConfig
) throws Exception {
-
-
+
+
// test create sink
createSink.tenant = tenant;
createSink.namespace = namespace;
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
similarity index 86%
rename from pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
rename to
pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index 504d5f4f6b..38d6ed36d7 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -16,14 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.core;
+package org.apache.pulsar.functions.api;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
-import org.apache.pulsar.common.api.EncryptionContext;
-
/**
* Pulsar Connect's Record interface. Record encapsulates the information
about a record being read from a Source.
*/
@@ -59,15 +57,6 @@
return Optional.empty();
}
- /**
- * Retrieves encryption-context that is attached to record.
- *
- * @return {@link Optional}<{@link EncryptionContext}>
- */
- default public Optional<EncryptionContext> getEncryptionCtx() {
- return Optional.empty();
- }
-
/**
* Retrieves user-defined properties attached to record.
*
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 33006b2c91..b479852b8c 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -53,6 +53,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
@@ -67,7 +68,6 @@
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index 4e2edbec23..05c2114edc 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -24,8 +24,7 @@
import lombok.AllArgsConstructor;
import lombok.Data;
-import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.functions.api.Record;
@Data
@AllArgsConstructor
@@ -58,12 +57,7 @@ public T getValue() {
return sourceRecord.getRecordSequence();
}
- @Override
- public Optional<EncryptionContext> getEncryptionCtx() {
- return sourceRecord.getEncryptionCtx();
- }
-
- @Override
+ @Override
public Map<String, String> getProperties() {
return sourceRecord.getProperties();
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 2170879ce3..8ae9ad0074 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -35,6 +35,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -45,7 +46,6 @@
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 7a6d11b1bd..c4a1657414 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -30,13 +30,12 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.Record;
@Builder
@Getter
@ToString
@EqualsAndHashCode
-public class PulsarRecord<T> implements Record<T> {
+public class PulsarRecord<T> implements RecordWithEncryptionContext<T> {
private final String topicName;
private final int partition;
@@ -67,11 +66,7 @@
return Optional.of(Utils.getSequenceId(message.getMessageId()));
}
- /**
- * Retrieves encryption-context that is attached to record.
- *
- * @return {@link Optional}<{@link EncryptionContext}>
- */
+ @Override
public Optional<EncryptionContext> getEncryptionCtx() {
return message.getEncryptionCtx();
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index d5a1df9ada..f70100d268 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -30,13 +30,13 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/RecordWithEncryptionContext.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/RecordWithEncryptionContext.java
new file mode 100644
index 0000000000..5ca78e29a8
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/RecordWithEncryptionContext.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.source;
+
+import java.util.Optional;
+
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.functions.api.Record;
+
+public interface RecordWithEncryptionContext<T> extends Record<T> {
+
+ /**
+ * Retrieves encryption-context that is attached to record.
+ *
+ * @return {@link Optional}<{@link EncryptionContext}>
+ */
+ Optional<EncryptionContext> getEncryptionCtx();
+}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 415912a7eb..866b92e8ed 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -54,6 +54,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
@@ -68,7 +69,6 @@
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
diff --git
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
index 6ea657e659..fe3787a0ce 100644
---
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
+++
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
@@ -36,8 +36,8 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.slf4j.Logger;
diff --git
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
index 40ffd9035b..bac07a0e7b 100644
---
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
+++
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
@@ -19,8 +19,8 @@
package org.apache.pulsar.io.aerospike;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
/**
* Aerospike sink that treats incoming messages on the input topic as Strings
diff --git
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
index fd76c0c630..d40a7cec5b 100644
---
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
+++
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
@@ -30,8 +30,8 @@
import java.util.Map;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
diff --git
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
index c2e72b8b3e..4e7feb5b6e 100644
---
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
+++
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
@@ -19,8 +19,8 @@
package org.apache.pulsar.io.cassandra;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
/**
* Cassandra sink that treats incoming messages on the input topic as Strings
diff --git a/pulsar-io/core/pom.xml b/pulsar-io/core/pom.xml
index 003e6d8611..05307ce3b7 100644
--- a/pulsar-io/core/pom.xml
+++ b/pulsar-io/core/pom.xml
@@ -38,16 +38,10 @@
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>pulsar-common</artifactId>
+ <artifactId>pulsar-functions-api</artifactId>
<version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
</dependency>
-
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>protobuf-shaded</artifactId>
diff --git
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
index 44d8162cab..13680c9211 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
@@ -21,6 +21,8 @@
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.pulsar.functions.api.Record;
+
/**
* Pulsar's Push Source interface. PushSource read data from
* external sources(database changes, twitter firehose, etc)
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
index 51d635ac31..16ed3c4c99 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
@@ -20,6 +20,8 @@
import java.util.Map;
+import org.apache.pulsar.functions.api.Record;
+
/**
* Generic sink interface users can implement to run Sink on top of Pulsar
Functions
*/
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
index e311761af1..a343844abd 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
@@ -20,6 +20,8 @@
import java.util.Map;
+import org.apache.pulsar.functions.api.Record;
+
public interface Source<T> extends AutoCloseable {
/**
* Open connector with configuration
diff --git
a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
index bb1100cf80..0980bd1562 100644
--- a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.core;
+import org.apache.pulsar.functions.api.Record;
import org.testng.annotations.Test;
import java.util.HashMap;
diff --git
a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java
b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java
index 18ad5c2321..a281a54de4 100644
--- a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.core;
+import org.apache.pulsar.functions.api.Record;
import org.testng.annotations.Test;
import java.util.HashMap;
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index d6acae83c7..a92a368f1c 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -29,8 +29,8 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index bbafc8e5eb..494d91b344 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -25,8 +25,8 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
index aab44748cb..89e3e7f8d6 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
@@ -19,8 +19,8 @@
package org.apache.pulsar.io.kafka;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
/**
* Kafka sink that treats incoming messages on the input topic as Strings
diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index f0c2776fab..1917e01889 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -38,6 +38,13 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-instance</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
@@ -81,7 +88,7 @@
<version>0.12.8</version>
</dependency>
<!-- /kinesis dependencies -->
-
+
<dependency>
<groupId>com.google.flatbuffers</groupId>
<artifactId>flatbuffers-java</artifactId>
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index c3c2c452fb..dc70b98c5c 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -47,7 +47,7 @@
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
index a7382b80e0..47f9222f95 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -30,7 +30,8 @@
import java.util.Optional;
import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.RecordWithEncryptionContext;
import org.apache.pulsar.io.kinesis.fbs.EncryptionCtx;
import org.apache.pulsar.io.kinesis.fbs.EncryptionKey;
import org.apache.pulsar.io.kinesis.fbs.KeyValue;
@@ -67,7 +68,9 @@ public static ByteBuffer
serializeRecordToFlatBuffer(Record<byte[]> record) {
public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder
builder, Record<byte[]> record) {
checkNotNull(record, "record-context can't be null");
- Optional<EncryptionContext> encryptionCtx = record.getEncryptionCtx();
+ Optional<EncryptionContext> encryptionCtx = (record instanceof
RecordWithEncryptionContext)
+ ? ((RecordWithEncryptionContext<byte[]>)
record).getEncryptionCtx()
+ : Optional.empty();
Map<String, String> properties = record.getProperties();
int encryptionCtxOffset = -1;
@@ -180,8 +183,11 @@ public static String serializeRecordToJson(Record<byte[]>
record) {
result.add(PROPERTIES_FIELD, properties);
}
- if (record.getEncryptionCtx().isPresent()) {
- EncryptionContext encryptionCtx = record.getEncryptionCtx().get();
+ Optional<EncryptionContext> optEncryptionCtx = (record instanceof
RecordWithEncryptionContext)
+ ? ((RecordWithEncryptionContext<byte[]>)
record).getEncryptionCtx()
+ : Optional.empty();
+ if (optEncryptionCtx.isPresent()) {
+ EncryptionContext encryptionCtx = optEncryptionCtx.get();
JsonObject encryptionCtxJson = new JsonObject();
JsonObject keyBase64Map = new JsonObject();
JsonObject keyMetadataMap = new JsonObject();
diff --git
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index 45aa0c7237..e6f669f78b 100644
---
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -35,7 +35,8 @@
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.RecordWithEncryptionContext;
import org.apache.pulsar.io.kinesis.fbs.KeyValue;
import org.apache.pulsar.io.kinesis.fbs.Message;
import org.testng.Assert;
@@ -201,7 +202,7 @@ public void testFbSerialization(boolean isEncryption)
throws Exception {
return new RecordImpl(data, properties, Optional.ofNullable(ctx));
}
- class RecordImpl implements Record<byte[]> {
+ class RecordImpl implements RecordWithEncryptionContext<byte[]> {
byte[] data;
Map<String, String> properties;
Optional<EncryptionContext> ectx;
diff --git
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index c05828fac1..2277cbb5f6 100644
---
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -32,8 +32,8 @@
import lombok.Data;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 494ae48e5b..76ea4ac250 100644
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -35,8 +35,8 @@
import java.util.Map;
import java.util.Optional;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services