This is an automated email from the ASF dual-hosted git repository.

cbornet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b193051bb70 Add HTTP Sink (#17581)
b193051bb70 is described below

commit b193051bb70f0432bd68cc64b9025a653abab3d8
Author: Christophe Bornet <[email protected]>
AuthorDate: Thu Oct 27 20:05:29 2022 +0200

    Add HTTP Sink (#17581)
---
 deployment/terraform-ansible/deploy-pulsar.yaml    |   1 +
 distribution/io/src/assemble/io.xml                |   1 +
 pom.xml                                            |   1 +
 pulsar-io/docs/pom.xml                             |   5 +
 pulsar-io/http/pom.xml                             |  87 ++++
 .../java/org/apache/pulsar/io/http/HttpSink.java   | 130 ++++++
 .../org/apache/pulsar/io/http/HttpSinkConfig.java  |  58 +++
 .../org/apache/pulsar/io/http/JsonConverter.java   | 241 +++++++++++
 .../org/apache/pulsar/io/http/package-info.java    |  19 +
 .../resources/META-INF/services/pulsar-io.yaml     |  23 ++
 .../org/apache/pulsar/io/http/HttpSinkTest.java    | 445 +++++++++++++++++++++
 pulsar-io/pom.xml                                  |   2 +
 12 files changed, 1013 insertions(+)

diff --git a/deployment/terraform-ansible/deploy-pulsar.yaml 
b/deployment/terraform-ansible/deploy-pulsar.yaml
index ae1243ad66e..db2fd1257ca 100644
--- a/deployment/terraform-ansible/deploy-pulsar.yaml
+++ b/deployment/terraform-ansible/deploy-pulsar.yaml
@@ -154,6 +154,7 @@
 #        - jdbc-mariadb
 #        - jdbc-postgres
 #        - jdbc-sqlite
+#        - http
         - kafka
 #        - kafka-connect-adaptor
 #        - kinesis
diff --git a/distribution/io/src/assemble/io.xml 
b/distribution/io/src/assemble/io.xml
index 7657b35ae09..33ca4e79ba9 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -47,6 +47,7 @@
     
<file><source>${basedir}/../../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/kafka/target/pulsar-io-kafka-${project.version}.nar</source></file>
+    
<file><source>${basedir}/../../pulsar-io/http/target/pulsar-io-http-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/rabbitmq/target/pulsar-io-rabbitmq-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/nsq/target/pulsar-io-nsq-${project.version}.nar</source></file>
diff --git a/pom.xml b/pom.xml
index 1121742ae31..53b7b2b2ee7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -248,6 +248,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <awaitility.version>4.2.0</awaitility.version>
     <reload4j.version>1.2.22</reload4j.version>
     <jettison.version>1.5.1</jettison.version>
+    <wiremock.version>2.33.2</wiremock.version>
 
     <!-- Plugin dependencies -->
     <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
diff --git a/pulsar-io/docs/pom.xml b/pulsar-io/docs/pom.xml
index 3ed776035fc..dca0edbc759 100644
--- a/pulsar-io/docs/pom.xml
+++ b/pulsar-io/docs/pom.xml
@@ -157,6 +157,11 @@
       <artifactId>pulsar-io-jdbc-openmldb</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-http</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-kafka</artifactId>
diff --git a/pulsar-io/http/pom.xml b/pulsar-io/http/pom.xml
new file mode 100644
index 00000000000..cd5a38d0ada
--- /dev/null
+++ b/pulsar-io/http/pom.xml
@@ -0,0 +1,87 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>pulsar-io</artifactId>
+        <version>2.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>pulsar-io-http</artifactId>
+    <name>Pulsar IO :: HTTP</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-io-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-jsr310</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-original</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.tomakehurst</groupId>
+            <artifactId>wiremock-jre8</artifactId>
+            <version>${wiremock.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-nar-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java 
b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java
new file mode 100644
index 00000000000..31b5053ba7a
--- /dev/null
+++ b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import java.io.IOException;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+/**
+ * A Sink that makes a POST request to a configured HTTP endpoint for each 
record (webhook).
+ * The body of the HTTP request is the JSON representation of the record value.
+ * Some headers are added to the HTTP request:
+ * <ul>
+ *   <li>PulsarTopic: the topic of the record</li>
+ *   <li>PulsarKey: the key of the record</li>
+ *   <li>PulsarEventTime: the event time of the record</li>
+ *   <li>PulsarPublishTime: the publish time of the record</li>
+ *   <li>PulsarMessageId: the ID of the message contained in the record</li>
+ *   <li>PulsarProperties-*: each record property is passed with the property 
name prefixed by PulsarProperties-</li>
+ * </ul>
+ */
+public class HttpSink implements Sink<GenericObject> {
+
+    HttpSinkConfig httpSinkConfig;
+    private HttpClient httpClient;
+    private ObjectMapper mapper;
+    private URI uri;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
+        httpSinkConfig = HttpSinkConfig.load(config);
+        uri = new URI(httpSinkConfig.getUrl());
+        httpClient = HttpClient.newHttpClient();
+        mapper = new ObjectMapper().registerModule(new JavaTimeModule());
+    }
+
+    @Override
+    public void write(Record<GenericObject> record) throws Exception {
+        Object json = toJsonSerializable(record.getSchema(), 
record.getValue().getNativeObject());
+        byte[] bytes = mapper.writeValueAsBytes(json);
+        HttpRequest.Builder builder = HttpRequest.newBuilder()
+            .uri(uri)
+            .POST(HttpRequest.BodyPublishers.ofByteArray(bytes));
+        httpSinkConfig.getHeaders().forEach(builder::header);
+        record.getProperties().forEach((k, v) -> 
builder.header("PulsarProperties-" + k, v));
+        record.getTopicName().ifPresent(topic -> builder.header("PulsarTopic", 
topic));
+        record.getEventTime().ifPresent(eventTime -> 
builder.header("PulsarEventTime", eventTime.toString()));
+        record.getKey().ifPresent(key -> builder.header("PulsarKey", key));
+        record.getMessage().ifPresent(
+            message -> {
+              if (message.getMessageId() != null) {
+                String messageId = 
Base64.getEncoder().encodeToString(message.getMessageId().toByteArray());
+                builder.header("PulsarMessageId", messageId);
+              }
+              if (message.getPublishTime() != 0) {
+                builder.header("PulsarPublishTime", 
String.valueOf(message.getPublishTime()));
+              }
+            }
+        );
+        builder.header("Content-Type", "application/json");
+
+        HttpResponse<String> response = httpClient.send(builder.build(), 
HttpResponse.BodyHandlers.ofString());
+
+        if (response.statusCode() < 200 || response.statusCode() >= 300) {
+            throw new IOException(
+                String.format("HTTP call to %s failed with status code %s", 
uri, response.statusCode()));
+        }
+
+    }
+
+    private static Object toJsonSerializable(Schema<?> schema, Object val) {
+        if (schema == null || schema.getSchemaInfo().getType().isPrimitive()) {
+            return val;
+        }
+        switch (schema.getSchemaInfo().getType()) {
+            case KEY_VALUE:
+                KeyValueSchema<?, ?> keyValueSchema = (KeyValueSchema<?, ?>) 
schema;
+                org.apache.pulsar.common.schema.KeyValue<?, ?> keyValue =
+                    (org.apache.pulsar.common.schema.KeyValue<?, ?>) val;
+                Map<String, Object> jsonKeyValue = new HashMap<>();
+                Object key = keyValue.getKey();
+                Object value = keyValue.getValue();
+                jsonKeyValue.put("key", 
toJsonSerializable(keyValueSchema.getKeySchema(),
+                    key instanceof GenericObject ? ((GenericObject) 
key).getNativeObject() : key));
+                jsonKeyValue.put("value", 
toJsonSerializable(keyValueSchema.getValueSchema(),
+                    value instanceof GenericObject ? ((GenericObject) 
value).getNativeObject() : value));
+                return jsonKeyValue;
+            case AVRO:
+                return 
JsonConverter.toJson((org.apache.avro.generic.GenericRecord) val);
+            case JSON:
+                return val;
+            default:
+                throw new UnsupportedOperationException("Unsupported schema 
type ="
+                    + schema.getSchemaInfo().getType());
+        }
+    }
+
+    @Override
+    public void close() {}
+}
diff --git 
a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java 
b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java
new file mode 100644
index 00000000000..2113aec7206
--- /dev/null
+++ b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Data;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+@Data
+@Accessors(chain = true)
+public class HttpSinkConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @FieldDoc(
+        defaultValue = "http://localhost";,
+        help = "The URL of the HTTP server")
+    private String url = "http://localhost";;
+
+    @FieldDoc(
+        defaultValue = "",
+        help = "The list of default headers added to each request")
+    private Map<String, String> headers = new HashMap<>();
+
+    public static HttpSinkConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), HttpSinkConfig.class);
+    }
+
+    public static HttpSinkConfig load(Map<String, Object> map) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
HttpSinkConfig.class);
+    }
+}
diff --git 
a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/JsonConverter.java 
b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/JsonConverter.java
new file mode 100644
index 00000000000..65ae6e87604
--- /dev/null
+++ b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/JsonConverter.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.http;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * Convert an AVRO GenericRecord to a JsonNode.
+ */
+public class JsonConverter {
+
+    private static final Map<String, LogicalTypeConverter<?>> 
logicalTypeConverters = new HashMap<>();
+    private static final JsonNodeFactory jsonNodeFactory = 
JsonNodeFactory.withExactBigDecimals(true);
+
+    public static JsonNode topLevelMerge(JsonNode n1, JsonNode n2) {
+        ObjectNode objectNode = jsonNodeFactory.objectNode();
+        n1.fieldNames().forEachRemaining(f -> objectNode.put(f, n1.get(f)));
+        n2.fieldNames().forEachRemaining(f -> objectNode.put(f, n2.get(f)));
+        return objectNode;
+    }
+
+    public static JsonNode toJson(GenericRecord genericRecord) {
+        if (genericRecord == null) {
+            return null;
+        }
+        ObjectNode objectNode = jsonNodeFactory.objectNode();
+        for (Schema.Field field : genericRecord.getSchema().getFields()) {
+            objectNode.set(field.name(), toJson(field.schema(), 
genericRecord.get(field.name())));
+        }
+        return objectNode;
+    }
+
+    public static JsonNode toJson(Schema schema, Object value) {
+        if (schema.getLogicalType() != null && 
logicalTypeConverters.containsKey(schema.getLogicalType().getName())) {
+            return 
logicalTypeConverters.get(schema.getLogicalType().getName()).toJson(schema, 
value);
+        }
+        if (value == null) {
+            return jsonNodeFactory.nullNode();
+        }
+        switch(schema.getType()) {
+            case NULL: // this should not happen
+                return jsonNodeFactory.nullNode();
+            case INT:
+                return jsonNodeFactory.numberNode((Integer) value);
+            case LONG:
+                return jsonNodeFactory.numberNode((Long) value);
+            case DOUBLE:
+                return jsonNodeFactory.numberNode((Double) value);
+            case FLOAT:
+                return jsonNodeFactory.numberNode((Float) value);
+            case BOOLEAN:
+                return jsonNodeFactory.booleanNode((Boolean) value);
+            case BYTES:
+                return jsonNodeFactory.binaryNode((byte[]) value);
+            case FIXED:
+                return jsonNodeFactory.binaryNode(((GenericFixed) 
value).bytes());
+            case ENUM: // GenericEnumSymbol
+            case STRING:
+                return jsonNodeFactory.textNode(value.toString()); // can be a 
String or org.apache.avro.util.Utf8
+            case ARRAY: {
+                Schema elementSchema = schema.getElementType();
+                ArrayNode arrayNode = jsonNodeFactory.arrayNode();
+                Object[] iterable;
+                if (value instanceof GenericData.Array) {
+                    iterable = ((GenericData.Array) value).toArray();
+                } else {
+                    iterable = (Object[]) value;
+                }
+                for (Object elem : iterable) {
+                    JsonNode fieldValue = toJson(elementSchema, elem);
+                    arrayNode.add(fieldValue);
+                }
+                return arrayNode;
+            }
+            case MAP: {
+                Map<Object, Object> map = (Map<Object, Object>) value;
+                ObjectNode objectNode = jsonNodeFactory.objectNode();
+                for (Map.Entry<Object, Object> entry : map.entrySet()) {
+                    JsonNode jsonNode = toJson(schema.getValueType(), 
entry.getValue());
+                    // can be a String or org.apache.avro.util.Utf8
+                    final String entryKey = entry.getKey() == null ? null : 
entry.getKey().toString();
+                    objectNode.set(entryKey, jsonNode);
+                }
+                return objectNode;
+            }
+            case RECORD:
+                return toJson((GenericRecord) value);
+            case UNION:
+                for (Schema s : schema.getTypes()) {
+                    if (s.getType() == Schema.Type.NULL) {
+                        continue;
+                    }
+                    return toJson(s, value);
+                }
+                // this case should not happen
+                return jsonNodeFactory.textNode(value.toString());
+            default:
+                throw new UnsupportedOperationException("Unknown AVRO schema 
type=" + schema.getType());
+        }
+    }
+
+    abstract static class LogicalTypeConverter<T> {
+        final Conversion<T> conversion;
+
+        public LogicalTypeConverter(Conversion<T> conversion) {
+            this.conversion = conversion;
+        }
+
+        abstract JsonNode toJson(Schema schema, Object value);
+    }
+
+    static {
+        logicalTypeConverters.put("decimal", new 
LogicalTypeConverter<BigDecimal>(
+                new Conversions.DecimalConversion()) {
+            @Override
+            JsonNode toJson(Schema schema, Object value) {
+                if (!(value instanceof BigDecimal)) {
+                    throw new IllegalArgumentException("Invalid type for 
Decimal, expected BigDecimal but was "
+                            + value.getClass());
+                }
+                BigDecimal decimal = (BigDecimal) value;
+                return jsonNodeFactory.numberNode(decimal);
+            }
+        });
+        logicalTypeConverters.put("date", new LogicalTypeConverter<LocalDate>(
+                new TimeConversions.DateConversion()) {
+            @Override
+            JsonNode toJson(Schema schema, Object value) {
+                if (!(value instanceof Integer)) {
+                    throw new IllegalArgumentException("Invalid type for date, 
expected Integer but was "
+                            + value.getClass());
+                }
+                Integer daysFromEpoch = (Integer) value;
+                return jsonNodeFactory.numberNode(daysFromEpoch);
+            }
+        });
+        logicalTypeConverters.put("time-millis", new 
LogicalTypeConverter<LocalTime>(
+                new TimeConversions.TimeMillisConversion()) {
+            @Override
+            JsonNode toJson(Schema schema, Object value) {
+                if (!(value instanceof Integer)) {
+                    throw new IllegalArgumentException("Invalid type for 
time-millis, expected Integer but was "
+                            + value.getClass());
+                }
+                Integer timeMillis = (Integer) value;
+                return jsonNodeFactory.numberNode(timeMillis);
+            }
+        });
+        logicalTypeConverters.put("time-micros", new 
LogicalTypeConverter<LocalTime>(
+                new TimeConversions.TimeMicrosConversion()) {
+            @Override
+            JsonNode toJson(Schema schema, Object value) {
+                if (!(value instanceof Long)) {
+                    throw new IllegalArgumentException("Invalid type for 
time-micros, expected Long but was "
+                            + value.getClass());
+                }
+                Long timeMicro = (Long) value;
+                return jsonNodeFactory.numberNode(timeMicro);
+            }
+        });
+        logicalTypeConverters.put("timestamp-millis", new 
LogicalTypeConverter<Instant>(
+                new TimeConversions.TimestampMillisConversion()) {
+            @Override
+            JsonNode toJson(Schema schema, Object value) {
+                if (!(value instanceof Long)) {
+                    throw new IllegalArgumentException("Invalid type for 
timestamp-millis, expected Long but was "
+                            + value.getClass());
+                }
+                Long epochMillis = (Long) value;
+                return jsonNodeFactory.numberNode(epochMillis);
+            }
+        });
+        logicalTypeConverters.put("timestamp-micros", new 
LogicalTypeConverter<Instant>(
+                new TimeConversions.TimestampMicrosConversion()) {
+            @Override
+            JsonNode toJson(Schema schema, Object value) {
+                if (!(value instanceof Long)) {
+                    throw new IllegalArgumentException("Invalid type for 
timestamp-micros, expected Long but was "
+                            + value.getClass());
+                }
+                Long epochMillis = (Long) value;
+                return jsonNodeFactory.numberNode(epochMillis);
+            }
+        });
+        logicalTypeConverters.put("uuid", new LogicalTypeConverter<UUID>(
+                new Conversions.UUIDConversion()) {
+            @Override
+            JsonNode toJson(Schema schema, Object value) {
+                return jsonNodeFactory.textNode(value == null ? null : 
value.toString());
+            }
+        });
+    }
+
+    public static ArrayNode toJsonArray(JsonNode jsonNode, List<String> 
fields) {
+        ArrayNode arrayNode = jsonNodeFactory.arrayNode();
+        Iterator<String> it = jsonNode.fieldNames();
+        while (it.hasNext()) {
+            String fieldName = it.next();
+            if (fields.contains(fieldName)) {
+                arrayNode.add(jsonNode.get(fieldName));
+            }
+        }
+        return arrayNode;
+    }
+
+}
diff --git 
a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/package-info.java 
b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/package-info.java
new file mode 100644
index 00000000000..87131fdf3be
--- /dev/null
+++ b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.http;
diff --git a/pulsar-io/http/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/http/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 00000000000..bdd1712b924
--- /dev/null
+++ b/pulsar-io/http/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: http
+description: Writes data to an HTTP server (Webhook)
+sinkClass: org.apache.pulsar.io.http.HttpSink
+sinkConfigClass: org.apache.pulsar.io.http.HttpSinkConfig
diff --git 
a/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java 
b/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
new file mode 100644
index 00000000000..d5de27d6284
--- /dev/null
+++ b/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.http;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.configureFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import java.io.IOException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class HttpSinkTest {
+
+    WireMockServer server;
+
+    @BeforeClass
+    public void setUp() {
+        server = new WireMockServer(0);
+        server.start();
+        configureFor(server.port());
+        stubFor(post(urlPathEqualTo("/"))
+            .willReturn(aResponse().withStatus(200)));
+    }
+
+    @AfterClass
+    public void tearDown() {
+        server.stop();
+    }
+
+    @DataProvider(name = "primitives")
+    public Object[][] primitives() {
+        return new Object[][]{
+            new Object[] {Schema.STRING, "test-string", "\"test-string\""},
+            new Object[] {Schema.INT8, (byte) 42, "42"},
+            new Object[] {Schema.INT16, (short) 42, "42"},
+            new Object[] {Schema.INT32, 42, "42"},
+            new Object[] {Schema.INT64, 42L, "42"},
+            new Object[] {Schema.BOOL, true, "true"},
+            new Object[] {Schema.FLOAT, 0.1F, "0.1"},
+            new Object[] {Schema.DOUBLE, 0.1D, "0.1"},
+            new Object[] {Schema.DATE, new Date(1662418008047L), 
"1662418008047"},
+            new Object[] {Schema.TIME, new Time(0, 46, 48), "\"00:46:48\""},
+            new Object[] {Schema.TIMESTAMP, new Timestamp(1662418008047L), 
"1662418008047"},
+            new Object[] {Schema.INSTANT, 
Instant.ofEpochMilli(1662418008047L), "1662418008.047000000"},
+            new Object[] {Schema.LOCAL_DATE, LocalDate.of(2022, 1, 1), 
"[2022,1,1]"},
+            new Object[] {Schema.LOCAL_TIME, LocalTime.of(11, 12), "[11,12]"},
+            new Object[] {Schema.LOCAL_DATE_TIME, new 
Timestamp(1662418008047L), "1662418008047"},
+        };
+    }
+
+    @Test(dataProvider = "primitives")
+    public void testPrimitives(Schema<?> schema, Object value, String 
responseBody) throws Exception {
+        GenericObject genericObject = new GenericObject() {
+            @Override
+            public SchemaType getSchemaType() {
+                return null;
+            }
+
+            @Override
+            public Object getNativeObject() {
+                return value;
+            }
+        };
+        test(schema, genericObject, responseBody);
+    }
+
+    @DataProvider(name = "schema")
+    public Object[][] schema() {
+        return new Object[][]{
+            new Object[]{Schema.JSON(Object.class)},
+            new Object[]{Schema.AVRO(Object.class)},
+        };
+    }
+
+    @Test(dataProvider = "schema")
+    public void testGenericRecord(Schema<?> schema) throws Exception {
+        SchemaType schemaType = schema.getSchemaInfo().getType();
+        RecordSchemaBuilder valueSchemaBuilder = 
org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
+        
valueSchemaBuilder.field("c").type(SchemaType.STRING).optional().defaultValue(null);
+        
valueSchemaBuilder.field("d").type(SchemaType.INT32).optional().defaultValue(null);
+        RecordSchemaBuilder udtSchemaBuilder = SchemaBuilder.record("type1");
+        
udtSchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null);
+        
udtSchemaBuilder.field("b").type(SchemaType.BOOLEAN).optional().defaultValue(null);
+        
udtSchemaBuilder.field("d").type(SchemaType.DOUBLE).optional().defaultValue(null);
+        
udtSchemaBuilder.field("f").type(SchemaType.FLOAT).optional().defaultValue(null);
+        
udtSchemaBuilder.field("i").type(SchemaType.INT32).optional().defaultValue(null);
+        
udtSchemaBuilder.field("l").type(SchemaType.INT64).optional().defaultValue(null);
+        GenericSchema<GenericRecord> udtGenericSchema = 
Schema.generic(udtSchemaBuilder.build(schemaType));
+        valueSchemaBuilder.field("e", 
udtGenericSchema).type(schemaType).optional().defaultValue(null);
+        GenericSchema<GenericRecord> valueSchema = 
Schema.generic(valueSchemaBuilder.build(schemaType));
+
+        GenericRecord valueGenericRecord = valueSchema.newRecordBuilder()
+            .set("c", "1")
+            .set("d", 1)
+            .set("e", udtGenericSchema.newRecordBuilder()
+                .set("a", "a")
+                .set("b", true)
+                .set("d", 1.0)
+                .set("f", 1.0f)
+                .set("i", 1)
+                .set("l", 10L)
+                .build())
+            .build();
+
+        String responseBody =
+            
"{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}";
+        test(schema, valueGenericRecord, responseBody);
+    }
+
+    @Test
+    public void testKeyValuePrimitives() throws Exception {
+        Schema<KeyValue<String, String>> keyValueSchema = 
KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING);
+        GenericObject genericObject = new GenericObject() {
+            @Override
+            public SchemaType getSchemaType() {
+                return null;
+            }
+
+            @Override
+            public Object getNativeObject() {
+                return new KeyValue<>("test-key", "test-value");
+            }
+        };
+        test(keyValueSchema, genericObject, 
"{\"value\":\"test-value\",\"key\":\"test-key\"}");
+    }
+
+    @Test(dataProvider = "schema")
+    public void testKeyValueGenericRecord(Schema<?> schema) throws Exception {
+        SchemaType schemaType = schema.getSchemaInfo().getType();
+        RecordSchemaBuilder keySchemaBuilder = 
org.apache.pulsar.client.api.schema.SchemaBuilder.record("key");
+        
keySchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null);
+        
keySchemaBuilder.field("b").type(SchemaType.INT32).optional().defaultValue(null);
+        GenericSchema<GenericRecord> keySchema = 
Schema.generic(keySchemaBuilder.build(schemaType));
+        GenericRecord keyGenericRecord = keySchema.newRecordBuilder()
+            .set("a", "1")
+            .set("b", 1)
+            .build();
+
+        RecordSchemaBuilder valueSchemaBuilder = 
org.apache.pulsar.client.api.schema.SchemaBuilder.record("value");
+        
valueSchemaBuilder.field("c").type(SchemaType.STRING).optional().defaultValue(null);
+        
valueSchemaBuilder.field("d").type(SchemaType.INT32).optional().defaultValue(null);
+        RecordSchemaBuilder udtSchemaBuilder = SchemaBuilder.record("type1");
+        
udtSchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null);
+        
udtSchemaBuilder.field("b").type(SchemaType.BOOLEAN).optional().defaultValue(null);
+        
udtSchemaBuilder.field("d").type(SchemaType.DOUBLE).optional().defaultValue(null);
+        
udtSchemaBuilder.field("f").type(SchemaType.FLOAT).optional().defaultValue(null);
+        
udtSchemaBuilder.field("i").type(SchemaType.INT32).optional().defaultValue(null);
+        
udtSchemaBuilder.field("l").type(SchemaType.INT64).optional().defaultValue(null);
+        GenericSchema<GenericRecord> udtGenericSchema = 
Schema.generic(udtSchemaBuilder.build(schemaType));
+        valueSchemaBuilder.field("e", 
udtGenericSchema).type(schemaType).optional().defaultValue(null);
+        GenericSchema<GenericRecord> valueSchema = 
Schema.generic(valueSchemaBuilder.build(schemaType));
+
+        GenericRecord valueGenericRecord = valueSchema.newRecordBuilder()
+            .set("c", "1")
+            .set("d", 1)
+            .set("e", udtGenericSchema.newRecordBuilder()
+                .set("a", "a")
+                .set("b", true)
+                .set("d", 1.0)
+                .set("f", 1.0f)
+                .set("i", 1)
+                .set("l", 10L)
+                .build())
+            .build();
+
+        Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema = 
Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
+        KeyValue<GenericRecord, GenericRecord> keyValue = new 
KeyValue<>(keyGenericRecord, valueGenericRecord);
+        GenericObject genericObject = new GenericObject() {
+            @Override
+            public SchemaType getSchemaType() {
+                return SchemaType.KEY_VALUE;
+            }
+
+            @Override
+            public Object getNativeObject() {
+                return keyValue;
+            }
+        };
+        String responseBody = 
"{\"value\":{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,"
+            + "\"i\":1,\"l\":10}},\"key\":{\"a\":\"1\",\"b\":1}}";
+        test(keyValueSchema, genericObject, responseBody);
+    }
+
+    private void test(Schema<?> schema, GenericObject genericObject, String 
responseBody) throws Exception {
+        HttpSink httpSink = new HttpSink();
+        Map<String, Object> config = new HashMap<>();
+        config.put("url", server.baseUrl());
+        Map<String, String> headers = new HashMap<>();
+        headers.put("header-name", "header-value");
+        config.put("headers", headers);
+        httpSink.open(config, null);
+
+        long now = 1662418008000L;
+        Map<String, String> messageProperties = new HashMap<>();
+        messageProperties.put("prop-name", "prop-value");
+
+        Record<GenericObject> record = new Record<>() {
+            @Override
+            public GenericObject getValue() {
+                return genericObject;
+            }
+
+            @Override
+            public Schema getSchema() {
+                return schema;
+            }
+
+            @Override
+            public Optional<Long> getEventTime() {
+                return Optional.of(now);
+            }
+
+            @Override
+            public Map<String, String> getProperties() {
+                return messageProperties;
+            }
+
+            @Override
+            public Optional<String> getTopicName() {
+                return Optional.of("test-topic");
+            }
+
+            @Override
+            public Optional<String> getKey() {
+                return Optional.of("test-key");
+            }
+
+            @Override
+            public Optional<Message<GenericObject>> getMessage() {
+                return Optional.of(new Message<>() {
+
+                    @Override
+                    public Map<String, String> getProperties() {
+                        return null;
+                    }
+
+                    @Override
+                    public boolean hasProperty(String name) {
+                        return false;
+                    }
+
+                    @Override
+                    public String getProperty(String name) {
+                        return null;
+                    }
+
+                    @Override
+                    public byte[] getData() {
+                        return new byte[0];
+                    }
+
+                    @Override
+                    public int size() {
+                        return 0;
+                    }
+
+                    @Override
+                    public GenericObject getValue() {
+                        return null;
+                    }
+
+                    @Override
+                    public MessageId getMessageId() {
+                        return new MessageIdImpl(1, 2, 3);
+                    }
+
+                    @Override
+                    public long getPublishTime() {
+                        return now + 1;
+                    }
+
+                    @Override
+                    public long getEventTime() {
+                        return 0;
+                    }
+
+                    @Override
+                    public long getSequenceId() {
+                        return 0;
+                    }
+
+                    @Override
+                    public String getProducerName() {
+                        return null;
+                    }
+
+                    @Override
+                    public boolean hasKey() {
+                        return false;
+                    }
+
+                    @Override
+                    public String getKey() {
+                        return null;
+                    }
+
+                    @Override
+                    public boolean hasBase64EncodedKey() {
+                        return false;
+                    }
+
+                    @Override
+                    public byte[] getKeyBytes() {
+                        return new byte[0];
+                    }
+
+                    @Override
+                    public boolean hasOrderingKey() {
+                        return false;
+                    }
+
+                    @Override
+                    public byte[] getOrderingKey() {
+                        return new byte[0];
+                    }
+
+                    @Override
+                    public String getTopicName() {
+                        return null;
+                    }
+
+                    @Override
+                    public Optional<EncryptionContext> getEncryptionCtx() {
+                        return Optional.empty();
+                    }
+
+                    @Override
+                    public int getRedeliveryCount() {
+                        return 0;
+                    }
+
+                    @Override
+                    public byte[] getSchemaVersion() {
+                        return new byte[0];
+                    }
+
+                    @Override
+                    public boolean isReplicated() {
+                        return false;
+                    }
+
+                    @Override
+                    public String getReplicatedFrom() {
+                        return null;
+                    }
+
+                    @Override
+                    public void release() {
+
+                    }
+
+                    @Override
+                    public boolean hasBrokerPublishTime() {
+                        return false;
+                    }
+
+                    @Override
+                    public Optional<Long> getBrokerPublishTime() {
+                        return Optional.empty();
+                    }
+
+                    @Override
+                    public boolean hasIndex() {
+                        return false;
+                    }
+
+                    @Override
+                    public Optional<Long> getIndex() {
+                        return Optional.empty();
+                    }
+                });
+            }
+        };
+        httpSink.write(record);
+
+        verify(postRequestedFor(urlEqualTo("/"))
+            .withRequestBody(equalTo(responseBody))
+            .withHeader("Content-Type", equalTo("application/json"))
+            .withHeader("header-name", equalTo("header-value"))
+            .withHeader("PulsarTopic", equalTo("test-topic"))
+            .withHeader("PulsarKey", equalTo("test-key"))
+            .withHeader("PulsarEventTime", equalTo("1662418008000"))
+            .withHeader("PulsarPublishTime", equalTo("1662418008001"))
+            .withHeader("PulsarMessageId", equalTo("CAEQAhgDMAA="))
+            .withHeader("PulsarProperties-prop-name", equalTo("prop-value"))
+        );
+    }
+
+    @Test(expectedExceptions = IOException.class)
+    public void testRequestFailure() throws Exception {
+        stubFor(post(urlPathEqualTo("/"))
+            .willReturn(aResponse().withStatus(500)));
+
+        testKeyValuePrimitives();
+    }
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index b2c400117a5..a5e096aff59 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -51,6 +51,7 @@
         <module>twitter</module>
         <module>cassandra</module>
         <module>aerospike</module>
+        <module>http</module>
         <module>kafka</module>
         <module>rabbitmq</module>
         <module>kinesis</module>
@@ -88,6 +89,7 @@
         <module>twitter</module>
         <module>cassandra</module>
         <module>aerospike</module>
+        <module>http</module>
         <module>kafka</module>
         <module>rabbitmq</module>
         <module>kinesis</module>


Reply via email to