This is an automated email from the ASF dual-hosted git repository.
cbornet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b193051bb70 Add HTTP Sink (#17581)
b193051bb70 is described below
commit b193051bb70f0432bd68cc64b9025a653abab3d8
Author: Christophe Bornet <[email protected]>
AuthorDate: Thu Oct 27 20:05:29 2022 +0200
Add HTTP Sink (#17581)
---
deployment/terraform-ansible/deploy-pulsar.yaml | 1 +
distribution/io/src/assemble/io.xml | 1 +
pom.xml | 1 +
pulsar-io/docs/pom.xml | 5 +
pulsar-io/http/pom.xml | 87 ++++
.../java/org/apache/pulsar/io/http/HttpSink.java | 130 ++++++
.../org/apache/pulsar/io/http/HttpSinkConfig.java | 58 +++
.../org/apache/pulsar/io/http/JsonConverter.java | 241 +++++++++++
.../org/apache/pulsar/io/http/package-info.java | 19 +
.../resources/META-INF/services/pulsar-io.yaml | 23 ++
.../org/apache/pulsar/io/http/HttpSinkTest.java | 445 +++++++++++++++++++++
pulsar-io/pom.xml | 2 +
12 files changed, 1013 insertions(+)
diff --git a/deployment/terraform-ansible/deploy-pulsar.yaml
b/deployment/terraform-ansible/deploy-pulsar.yaml
index ae1243ad66e..db2fd1257ca 100644
--- a/deployment/terraform-ansible/deploy-pulsar.yaml
+++ b/deployment/terraform-ansible/deploy-pulsar.yaml
@@ -154,6 +154,7 @@
# - jdbc-mariadb
# - jdbc-postgres
# - jdbc-sqlite
+# - http
- kafka
# - kafka-connect-adaptor
# - kinesis
diff --git a/distribution/io/src/assemble/io.xml
b/distribution/io/src/assemble/io.xml
index 7657b35ae09..33ca4e79ba9 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -47,6 +47,7 @@
<file><source>${basedir}/../../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/kafka/target/pulsar-io-kafka-${project.version}.nar</source></file>
+
<file><source>${basedir}/../../pulsar-io/http/target/pulsar-io-http-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/rabbitmq/target/pulsar-io-rabbitmq-${project.version}.nar</source></file>
<file><source>${basedir}/../../pulsar-io/nsq/target/pulsar-io-nsq-${project.version}.nar</source></file>
diff --git a/pom.xml b/pom.xml
index 1121742ae31..53b7b2b2ee7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -248,6 +248,7 @@ flexible messaging model and an intuitive client
API.</description>
<awaitility.version>4.2.0</awaitility.version>
<reload4j.version>1.2.22</reload4j.version>
<jettison.version>1.5.1</jettison.version>
+ <wiremock.version>2.33.2</wiremock.version>
<!-- Plugin dependencies -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml
index 3ed776035fc..dca0edbc759 100644
--- a/pulsar-io/docs/pom.xml
+++ b/pulsar-io/docs/pom.xml
@@ -157,6 +157,11 @@
<artifactId>pulsar-io-jdbc-openmldb</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-http</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-kafka</artifactId>
diff --git a/pulsar-io/http/pom.xml b/pulsar-io/http/pom.xml
new file mode 100644
index 00000000000..cd5a38d0ada
--- /dev/null
+++ b/pulsar-io/http/pom.xml
@@ -0,0 +1,87 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io</artifactId>
+ <version>2.11.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-io-http</artifactId>
+ <name>Pulsar IO :: HTTP</name>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.tomakehurst</groupId>
+ <artifactId>wiremock-jre8</artifactId>
+ <version>${wiremock.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java
b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java
new file mode 100644
index 00000000000..31b5053ba7a
--- /dev/null
+++ b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Sink that makes a POST request to a configured HTTP endpoint for each
record (webhook).
+ * The body of the HTTP request is the JSON representation of the record value.
+ * Some headers are added to the HTTP request:
+ * <ul>
+ * <li>PulsarTopic: the topic of the record</li>
+ * <li>PulsarKey: the key of the record</li>
+ * <li>PulsarEventTime: the event time of the record</li>
+ * <li>PulsarPublishTime: the publish time of the record</li>
+ * <li>PulsarMessageId: the ID of the message contained in the record</li>
+ * <li>PulsarProperties-*: each record property is passed with the property
name prefixed by PulsarProperties-</li>
+ * </ul>
+ */
+public class HttpSink implements Sink<GenericObject> {
+
+ HttpSinkConfig httpSinkConfig;
+ private HttpClient httpClient;
+ private ObjectMapper mapper;
+ private URI uri;
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
+ httpSinkConfig = HttpSinkConfig.load(config);
+ uri = new URI(httpSinkConfig.getUrl());
+ httpClient = HttpClient.newHttpClient();
+ mapper = new ObjectMapper().registerModule(new JavaTimeModule());
+ }
+
+ @Override
+ public void write(Record<GenericObject> record) throws Exception {
+ Object json = toJsonSerializable(record.getSchema(),
record.getValue().getNativeObject());
+ byte[] bytes = mapper.writeValueAsBytes(json);
+ HttpRequest.Builder builder = HttpRequest.newBuilder()
+ .uri(uri)
+ .POST(HttpRequest.BodyPublishers.ofByteArray(bytes));
+ httpSinkConfig.getHeaders().forEach(builder::header);
+ record.getProperties().forEach((k, v) ->
builder.header("PulsarProperties-" + k, v));
+ record.getTopicName().ifPresent(topic -> builder.header("PulsarTopic",
topic));
+ record.getEventTime().ifPresent(eventTime ->
builder.header("PulsarEventTime", eventTime.toString()));
+ record.getKey().ifPresent(key -> builder.header("PulsarKey", key));
+ record.getMessage().ifPresent(
+ message -> {
+ if (message.getMessageId() != null) {
+ String messageId =
Base64.getEncoder().encodeToString(message.getMessageId().toByteArray());
+ builder.header("PulsarMessageId", messageId);
+ }
+ if (message.getPublishTime() != 0) {
+ builder.header("PulsarPublishTime",
String.valueOf(message.getPublishTime()));
+ }
+ }
+ );
+ builder.header("Content-Type", "application/json");
+
+ HttpResponse<String> response = httpClient.send(builder.build(),
HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() < 200 || response.statusCode() >= 300) {
+ throw new IOException(
+ String.format("HTTP call to %s failed with status code %s",
uri, response.statusCode()));
+ }
+
+ }
+
+ private static Object toJsonSerializable(Schema<?> schema, Object val) {
+ if (schema == null || schema.getSchemaInfo().getType().isPrimitive()) {
+ return val;
+ }
+ switch (schema.getSchemaInfo().getType()) {
+ case KEY_VALUE:
+ KeyValueSchema<?, ?> keyValueSchema = (KeyValueSchema<?, ?>)
schema;
+ org.apache.pulsar.common.schema.KeyValue<?, ?> keyValue =
+ (org.apache.pulsar.common.schema.KeyValue<?, ?>) val;
+ Map<String, Object> jsonKeyValue = new HashMap<>();
+ Object key = keyValue.getKey();
+ Object value = keyValue.getValue();
+ jsonKeyValue.put("key",
toJsonSerializable(keyValueSchema.getKeySchema(),
+ key instanceof GenericObject ? ((GenericObject)
key).getNativeObject() : key));
+ jsonKeyValue.put("value",
toJsonSerializable(keyValueSchema.getValueSchema(),
+ value instanceof GenericObject ? ((GenericObject)
value).getNativeObject() : value));
+ return jsonKeyValue;
+ case AVRO:
+ return
JsonConverter.toJson((org.apache.avro.generic.GenericRecord) val);
+ case JSON:
+ return val;
+ default:
+ throw new UnsupportedOperationException("Unsupported schema
type ="
+ + schema.getSchemaInfo().getType());
+ }
+ }
+
+ @Override
+ public void close() {}
+}
diff --git
a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java
b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java
new file mode 100644
index 00000000000..2113aec7206
--- /dev/null
+++ b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Data;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+@Data
+@Accessors(chain = true)
+public class HttpSinkConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @FieldDoc(
+ defaultValue = "http://localhost",
+ help = "The URL of the HTTP server")
+ private String url = "http://localhost";
+
+ @FieldDoc(
+ defaultValue = "",
+ help = "The list of default headers added to each request")
+ private Map<String, String> headers = new HashMap<>();
+
+ public static HttpSinkConfig load(String yamlFile) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), HttpSinkConfig.class);
+ }
+
+ public static HttpSinkConfig load(Map<String, Object> map) throws
IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map),
HttpSinkConfig.class);
+ }
+}
diff --git
a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/JsonConverter.java
b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/JsonConverter.java
new file mode 100644
index 00000000000..65ae6e87604
--- /dev/null
+++ b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/JsonConverter.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.http;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * Convert an AVRO GenericRecord to a JsonNode.
+ */
+public class JsonConverter {
+
+ private static final Map<String, LogicalTypeConverter<?>>
logicalTypeConverters = new HashMap<>();
+ private static final JsonNodeFactory jsonNodeFactory =
JsonNodeFactory.withExactBigDecimals(true);
+
+ public static JsonNode topLevelMerge(JsonNode n1, JsonNode n2) {
+ ObjectNode objectNode = jsonNodeFactory.objectNode();
+ n1.fieldNames().forEachRemaining(f -> objectNode.put(f, n1.get(f)));
+ n2.fieldNames().forEachRemaining(f -> objectNode.put(f, n2.get(f)));
+ return objectNode;
+ }
+
+ public static JsonNode toJson(GenericRecord genericRecord) {
+ if (genericRecord == null) {
+ return null;
+ }
+ ObjectNode objectNode = jsonNodeFactory.objectNode();
+ for (Schema.Field field : genericRecord.getSchema().getFields()) {
+ objectNode.set(field.name(), toJson(field.schema(),
genericRecord.get(field.name())));
+ }
+ return objectNode;
+ }
+
+ public static JsonNode toJson(Schema schema, Object value) {
+ if (schema.getLogicalType() != null &&
logicalTypeConverters.containsKey(schema.getLogicalType().getName())) {
+ return
logicalTypeConverters.get(schema.getLogicalType().getName()).toJson(schema,
value);
+ }
+ if (value == null) {
+ return jsonNodeFactory.nullNode();
+ }
+ switch(schema.getType()) {
+ case NULL: // this should not happen
+ return jsonNodeFactory.nullNode();
+ case INT:
+ return jsonNodeFactory.numberNode((Integer) value);
+ case LONG:
+ return jsonNodeFactory.numberNode((Long) value);
+ case DOUBLE:
+ return jsonNodeFactory.numberNode((Double) value);
+ case FLOAT:
+ return jsonNodeFactory.numberNode((Float) value);
+ case BOOLEAN:
+ return jsonNodeFactory.booleanNode((Boolean) value);
+ case BYTES:
+ return jsonNodeFactory.binaryNode((byte[]) value);
+ case FIXED:
+ return jsonNodeFactory.binaryNode(((GenericFixed)
value).bytes());
+ case ENUM: // GenericEnumSymbol
+ case STRING:
+ return jsonNodeFactory.textNode(value.toString()); // can be a
String or org.apache.avro.util.Utf8
+ case ARRAY: {
+ Schema elementSchema = schema.getElementType();
+ ArrayNode arrayNode = jsonNodeFactory.arrayNode();
+ Object[] iterable;
+ if (value instanceof GenericData.Array) {
+ iterable = ((GenericData.Array) value).toArray();
+ } else {
+ iterable = (Object[]) value;
+ }
+ for (Object elem : iterable) {
+ JsonNode fieldValue = toJson(elementSchema, elem);
+ arrayNode.add(fieldValue);
+ }
+ return arrayNode;
+ }
+ case MAP: {
+ Map<Object, Object> map = (Map<Object, Object>) value;
+ ObjectNode objectNode = jsonNodeFactory.objectNode();
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ JsonNode jsonNode = toJson(schema.getValueType(),
entry.getValue());
+ // can be a String or org.apache.avro.util.Utf8
+ final String entryKey = entry.getKey() == null ? null :
entry.getKey().toString();
+ objectNode.set(entryKey, jsonNode);
+ }
+ return objectNode;
+ }
+ case RECORD:
+ return toJson((GenericRecord) value);
+ case UNION:
+ for (Schema s : schema.getTypes()) {
+ if (s.getType() == Schema.Type.NULL) {
+ continue;
+ }
+ return toJson(s, value);
+ }
+ // this case should not happen
+ return jsonNodeFactory.textNode(value.toString());
+ default:
+ throw new UnsupportedOperationException("Unknown AVRO schema
type=" + schema.getType());
+ }
+ }
+
+ abstract static class LogicalTypeConverter<T> {
+ final Conversion<T> conversion;
+
+ public LogicalTypeConverter(Conversion<T> conversion) {
+ this.conversion = conversion;
+ }
+
+ abstract JsonNode toJson(Schema schema, Object value);
+ }
+
+ static {
+ logicalTypeConverters.put("decimal", new
LogicalTypeConverter<BigDecimal>(
+ new Conversions.DecimalConversion()) {
+ @Override
+ JsonNode toJson(Schema schema, Object value) {
+ if (!(value instanceof BigDecimal)) {
+ throw new IllegalArgumentException("Invalid type for
Decimal, expected BigDecimal but was "
+ + value.getClass());
+ }
+ BigDecimal decimal = (BigDecimal) value;
+ return jsonNodeFactory.numberNode(decimal);
+ }
+ });
+ logicalTypeConverters.put("date", new LogicalTypeConverter<LocalDate>(
+ new TimeConversions.DateConversion()) {
+ @Override
+ JsonNode toJson(Schema schema, Object value) {
+ if (!(value instanceof Integer)) {
+ throw new IllegalArgumentException("Invalid type for date,
expected Integer but was "
+ + value.getClass());
+ }
+ Integer daysFromEpoch = (Integer) value;
+ return jsonNodeFactory.numberNode(daysFromEpoch);
+ }
+ });
+ logicalTypeConverters.put("time-millis", new
LogicalTypeConverter<LocalTime>(
+ new TimeConversions.TimeMillisConversion()) {
+ @Override
+ JsonNode toJson(Schema schema, Object value) {
+ if (!(value instanceof Integer)) {
+ throw new IllegalArgumentException("Invalid type for
time-millis, expected Integer but was "
+ + value.getClass());
+ }
+ Integer timeMillis = (Integer) value;
+ return jsonNodeFactory.numberNode(timeMillis);
+ }
+ });
+ logicalTypeConverters.put("time-micros", new
LogicalTypeConverter<LocalTime>(
+ new TimeConversions.TimeMicrosConversion()) {
+ @Override
+ JsonNode toJson(Schema schema, Object value) {
+ if (!(value instanceof Long)) {
+ throw new IllegalArgumentException("Invalid type for
time-micros, expected Long but was "
+ + value.getClass());
+ }
+ Long timeMicro = (Long) value;
+ return jsonNodeFactory.numberNode(timeMicro);
+ }
+ });
+ logicalTypeConverters.put("timestamp-millis", new
LogicalTypeConverter<Instant>(
+ new TimeConversions.TimestampMillisConversion()) {
+ @Override
+ JsonNode toJson(Schema schema, Object value) {
+ if (!(value instanceof Long)) {
+ throw new IllegalArgumentException("Invalid type for
timestamp-millis, expected Long but was "
+ + value.getClass());
+ }
+ Long epochMillis = (Long) value;
+ return jsonNodeFactory.numberNode(epochMillis);
+ }
+ });
+ logicalTypeConverters.put("timestamp-micros", new
LogicalTypeConverter<Instant>(
+ new TimeConversions.TimestampMicrosConversion()) {
+ @Override
+ JsonNode toJson(Schema schema, Object value) {
+ if (!(value instanceof Long)) {
+ throw new IllegalArgumentException("Invalid type for
timestamp-micros, expected Long but was "
+ + value.getClass());
+ }
+ Long epochMillis = (Long) value;
+ return jsonNodeFactory.numberNode(epochMillis);
+ }
+ });
+ logicalTypeConverters.put("uuid", new LogicalTypeConverter<UUID>(
+ new Conversions.UUIDConversion()) {
+ @Override
+ JsonNode toJson(Schema schema, Object value) {
+ return jsonNodeFactory.textNode(value == null ? null :
value.toString());
+ }
+ });
+ }
+
+ public static ArrayNode toJsonArray(JsonNode jsonNode, List<String>
fields) {
+ ArrayNode arrayNode = jsonNodeFactory.arrayNode();
+ Iterator<String> it = jsonNode.fieldNames();
+ while (it.hasNext()) {
+ String fieldName = it.next();
+ if (fields.contains(fieldName)) {
+ arrayNode.add(jsonNode.get(fieldName));
+ }
+ }
+ return arrayNode;
+ }
+
+}
diff --git
a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/package-info.java
b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/package-info.java
new file mode 100644
index 00000000000..87131fdf3be
--- /dev/null
+++ b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.http;
diff --git a/pulsar-io/http/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/http/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 00000000000..bdd1712b924
--- /dev/null
+++ b/pulsar-io/http/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: http
+description: Writes data to an HTTP server (Webhook)
+sinkClass: org.apache.pulsar.io.http.HttpSink
+sinkConfigClass: org.apache.pulsar.io.http.HttpSinkConfig
diff --git
a/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
b/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
new file mode 100644
index 00000000000..d5de27d6284
--- /dev/null
+++ b/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.http;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.configureFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import java.io.IOException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class HttpSinkTest {
+
+ WireMockServer server;
+
+ @BeforeClass
+ public void setUp() {
+ server = new WireMockServer(0);
+ server.start();
+ configureFor(server.port());
+ stubFor(post(urlPathEqualTo("/"))
+ .willReturn(aResponse().withStatus(200)));
+ }
+
+ @AfterClass
+ public void tearDown() {
+ server.stop();
+ }
+
+ @DataProvider(name = "primitives")
+ public Object[][] primitives() {
+ return new Object[][]{
+ new Object[] {Schema.STRING, "test-string", "\"test-string\""},
+ new Object[] {Schema.INT8, (byte) 42, "42"},
+ new Object[] {Schema.INT16, (short) 42, "42"},
+ new Object[] {Schema.INT32, 42, "42"},
+ new Object[] {Schema.INT64, 42L, "42"},
+ new Object[] {Schema.BOOL, true, "true"},
+ new Object[] {Schema.FLOAT, 0.1F, "0.1"},
+ new Object[] {Schema.DOUBLE, 0.1D, "0.1"},
+ new Object[] {Schema.DATE, new Date(1662418008047L),
"1662418008047"},
+ new Object[] {Schema.TIME, new Time(0, 46, 48), "\"00:46:48\""},
+ new Object[] {Schema.TIMESTAMP, new Timestamp(1662418008047L),
"1662418008047"},
+ new Object[] {Schema.INSTANT,
Instant.ofEpochMilli(1662418008047L), "1662418008.047000000"},
+ new Object[] {Schema.LOCAL_DATE, LocalDate.of(2022, 1, 1),
"[2022,1,1]"},
+ new Object[] {Schema.LOCAL_TIME, LocalTime.of(11, 12), "[11,12]"},
+ new Object[] {Schema.LOCAL_DATE_TIME, new
Timestamp(1662418008047L), "1662418008047"},
+ };
+ }
+
+ @Test(dataProvider = "primitives")
+ public void testPrimitives(Schema<?> schema, Object value, String
responseBody) throws Exception {
+ GenericObject genericObject = new GenericObject() {
+ @Override
+ public SchemaType getSchemaType() {
+ return null;
+ }
+
+ @Override
+ public Object getNativeObject() {
+ return value;
+ }
+ };
+ test(schema, genericObject, responseBody);
+ }
+
+ @DataProvider(name = "schema")
+ public Object[][] schema() {
+ return new Object[][]{
+ new Object[]{Schema.JSON(Object.class)},
+ new Object[]{Schema.AVRO(Object.class)},
+ };
+ }
+
+ @Test(dataProvider = "schema")
+ public void testGenericRecord(Schema<?> schema) throws Exception {
+ SchemaType schemaType = schema.getSchemaInfo().getType();
+ RecordSchemaBuilder valueSchemaBuilder =
org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
+
valueSchemaBuilder.field("c").type(SchemaType.STRING).optional().defaultValue(null);
+
valueSchemaBuilder.field("d").type(SchemaType.INT32).optional().defaultValue(null);
+ RecordSchemaBuilder udtSchemaBuilder = SchemaBuilder.record("type1");
+
udtSchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null);
+
udtSchemaBuilder.field("b").type(SchemaType.BOOLEAN).optional().defaultValue(null);
+
udtSchemaBuilder.field("d").type(SchemaType.DOUBLE).optional().defaultValue(null);
+
udtSchemaBuilder.field("f").type(SchemaType.FLOAT).optional().defaultValue(null);
+
udtSchemaBuilder.field("i").type(SchemaType.INT32).optional().defaultValue(null);
+
udtSchemaBuilder.field("l").type(SchemaType.INT64).optional().defaultValue(null);
+ GenericSchema<GenericRecord> udtGenericSchema =
Schema.generic(udtSchemaBuilder.build(schemaType));
+ valueSchemaBuilder.field("e",
udtGenericSchema).type(schemaType).optional().defaultValue(null);
+ GenericSchema<GenericRecord> valueSchema =
Schema.generic(valueSchemaBuilder.build(schemaType));
+
+ GenericRecord valueGenericRecord = valueSchema.newRecordBuilder()
+ .set("c", "1")
+ .set("d", 1)
+ .set("e", udtGenericSchema.newRecordBuilder()
+ .set("a", "a")
+ .set("b", true)
+ .set("d", 1.0)
+ .set("f", 1.0f)
+ .set("i", 1)
+ .set("l", 10L)
+ .build())
+ .build();
+
+ String responseBody =
+
"{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}";
+ test(schema, valueGenericRecord, responseBody);
+ }
+
+ @Test
+ public void testKeyValuePrimitives() throws Exception {
+ Schema<KeyValue<String, String>> keyValueSchema =
KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING);
+ GenericObject genericObject = new GenericObject() {
+ @Override
+ public SchemaType getSchemaType() {
+ return null;
+ }
+
+ @Override
+ public Object getNativeObject() {
+ return new KeyValue<>("test-key", "test-value");
+ }
+ };
+ test(keyValueSchema, genericObject,
"{\"value\":\"test-value\",\"key\":\"test-key\"}");
+ }
+
+ @Test(dataProvider = "schema")
+ public void testKeyValueGenericRecord(Schema<?> schema) throws Exception {
+ SchemaType schemaType = schema.getSchemaInfo().getType();
+ RecordSchemaBuilder keySchemaBuilder =
org.apache.pulsar.client.api.schema.SchemaBuilder.record("key");
+
keySchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null);
+
keySchemaBuilder.field("b").type(SchemaType.INT32).optional().defaultValue(null);
+ GenericSchema<GenericRecord> keySchema =
Schema.generic(keySchemaBuilder.build(schemaType));
+ GenericRecord keyGenericRecord = keySchema.newRecordBuilder()
+ .set("a", "1")
+ .set("b", 1)
+ .build();
+
+ RecordSchemaBuilder valueSchemaBuilder =
org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
+
valueSchemaBuilder.field("c").type(SchemaType.STRING).optional().defaultValue(null);
+
valueSchemaBuilder.field("d").type(SchemaType.INT32).optional().defaultValue(null);
+ RecordSchemaBuilder udtSchemaBuilder = SchemaBuilder.record("type1");
+
udtSchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null);
+
udtSchemaBuilder.field("b").type(SchemaType.BOOLEAN).optional().defaultValue(null);
+
udtSchemaBuilder.field("d").type(SchemaType.DOUBLE).optional().defaultValue(null);
+
udtSchemaBuilder.field("f").type(SchemaType.FLOAT).optional().defaultValue(null);
+
udtSchemaBuilder.field("i").type(SchemaType.INT32).optional().defaultValue(null);
+
udtSchemaBuilder.field("l").type(SchemaType.INT64).optional().defaultValue(null);
+ GenericSchema<GenericRecord> udtGenericSchema =
Schema.generic(udtSchemaBuilder.build(schemaType));
+ valueSchemaBuilder.field("e",
udtGenericSchema).type(schemaType).optional().defaultValue(null);
+ GenericSchema<GenericRecord> valueSchema =
Schema.generic(valueSchemaBuilder.build(schemaType));
+
+ GenericRecord valueGenericRecord = valueSchema.newRecordBuilder()
+ .set("c", "1")
+ .set("d", 1)
+ .set("e", udtGenericSchema.newRecordBuilder()
+ .set("a", "a")
+ .set("b", true)
+ .set("d", 1.0)
+ .set("f", 1.0f)
+ .set("i", 1)
+ .set("l", 10L)
+ .build())
+ .build();
+
+ Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema =
Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
+ KeyValue<GenericRecord, GenericRecord> keyValue = new
KeyValue<>(keyGenericRecord, valueGenericRecord);
+ GenericObject genericObject = new GenericObject() {
+ @Override
+ public SchemaType getSchemaType() {
+ return SchemaType.KEY_VALUE;
+ }
+
+ @Override
+ public Object getNativeObject() {
+ return keyValue;
+ }
+ };
+ String responseBody =
"{\"value\":{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,"
+ + "\"i\":1,\"l\":10}},\"key\":{\"a\":\"1\",\"b\":1}}";
+ test(keyValueSchema, genericObject, responseBody);
+ }
+
+ private void test(Schema<?> schema, GenericObject genericObject, String
responseBody) throws Exception {
+ HttpSink httpSink = new HttpSink();
+ Map<String, Object> config = new HashMap<>();
+ config.put("url", server.baseUrl());
+ Map<String, String> headers = new HashMap<>();
+ headers.put("header-name", "header-value");
+ config.put("headers", headers);
+ httpSink.open(config, null);
+
+ long now = 1662418008000L;
+ Map<String, String> messageProperties = new HashMap<>();
+ messageProperties.put("prop-name", "prop-value");
+
+ Record<GenericObject> record = new Record<>() {
+ @Override
+ public GenericObject getValue() {
+ return genericObject;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public Optional<Long> getEventTime() {
+ return Optional.of(now);
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return messageProperties;
+ }
+
+ @Override
+ public Optional<String> getTopicName() {
+ return Optional.of("test-topic");
+ }
+
+ @Override
+ public Optional<String> getKey() {
+ return Optional.of("test-key");
+ }
+
+ @Override
+ public Optional<Message<GenericObject>> getMessage() {
+ return Optional.of(new Message<>() {
+
+ @Override
+ public Map<String, String> getProperties() {
+ return null;
+ }
+
+ @Override
+ public boolean hasProperty(String name) {
+ return false;
+ }
+
+ @Override
+ public String getProperty(String name) {
+ return null;
+ }
+
+ @Override
+ public byte[] getData() {
+ return new byte[0];
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public GenericObject getValue() {
+ return null;
+ }
+
+ @Override
+ public MessageId getMessageId() {
+ return new MessageIdImpl(1, 2, 3);
+ }
+
+ @Override
+ public long getPublishTime() {
+ return now + 1;
+ }
+
+ @Override
+ public long getEventTime() {
+ return 0;
+ }
+
+ @Override
+ public long getSequenceId() {
+ return 0;
+ }
+
+ @Override
+ public String getProducerName() {
+ return null;
+ }
+
+ @Override
+ public boolean hasKey() {
+ return false;
+ }
+
+ @Override
+ public String getKey() {
+ return null;
+ }
+
+ @Override
+ public boolean hasBase64EncodedKey() {
+ return false;
+ }
+
+ @Override
+ public byte[] getKeyBytes() {
+ return new byte[0];
+ }
+
+ @Override
+ public boolean hasOrderingKey() {
+ return false;
+ }
+
+ @Override
+ public byte[] getOrderingKey() {
+ return new byte[0];
+ }
+
+ @Override
+ public String getTopicName() {
+ return null;
+ }
+
+ @Override
+ public Optional<EncryptionContext> getEncryptionCtx() {
+ return Optional.empty();
+ }
+
+ @Override
+ public int getRedeliveryCount() {
+ return 0;
+ }
+
+ @Override
+ public byte[] getSchemaVersion() {
+ return new byte[0];
+ }
+
+ @Override
+ public boolean isReplicated() {
+ return false;
+ }
+
+ @Override
+ public String getReplicatedFrom() {
+ return null;
+ }
+
+ @Override
+ public void release() {
+
+ }
+
+ @Override
+ public boolean hasBrokerPublishTime() {
+ return false;
+ }
+
+ @Override
+ public Optional<Long> getBrokerPublishTime() {
+ return Optional.empty();
+ }
+
+ @Override
+ public boolean hasIndex() {
+ return false;
+ }
+
+ @Override
+ public Optional<Long> getIndex() {
+ return Optional.empty();
+ }
+ });
+ }
+ };
+ httpSink.write(record);
+
+ verify(postRequestedFor(urlEqualTo("/"))
+ .withRequestBody(equalTo(responseBody))
+ .withHeader("Content-Type", equalTo("application/json"))
+ .withHeader("header-name", equalTo("header-value"))
+ .withHeader("PulsarTopic", equalTo("test-topic"))
+ .withHeader("PulsarKey", equalTo("test-key"))
+ .withHeader("PulsarEventTime", equalTo("1662418008000"))
+ .withHeader("PulsarPublishTime", equalTo("1662418008001"))
+ .withHeader("PulsarMessageId", equalTo("CAEQAhgDMAA="))
+ .withHeader("PulsarProperties-prop-name", equalTo("prop-value"))
+ );
+ }
+
+ @Test(expectedExceptions = IOException.class)
+ public void testRequestFailure() throws Exception {
+ stubFor(post(urlPathEqualTo("/"))
+ .willReturn(aResponse().withStatus(500)));
+
+ testKeyValuePrimitives();
+ }
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index b2c400117a5..a5e096aff59 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -51,6 +51,7 @@
<module>twitter</module>
<module>cassandra</module>
<module>aerospike</module>
+ <module>http</module>
<module>kafka</module>
<module>rabbitmq</module>
<module>kinesis</module>
@@ -88,6 +89,7 @@
<module>twitter</module>
<module>cassandra</module>
<module>aerospike</module>
+ <module>http</module>
<module>kafka</module>
<module>rabbitmq</module>
<module>kinesis</module>