This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 03082bc5eca [improve][io] Add PUT method support to HttpSink (#25133)
03082bc5eca is described below

commit 03082bc5eca615d29595c0ebc80bfc4bb6c89423
Author: Dream95 <[email protected]>
AuthorDate: Wed Jan 14 21:55:20 2026 +0800

    [improve][io] Add PUT method support to HttpSink (#25133)
    
    Signed-off-by: Dream95 <[email protected]>
---
 .../main/java/org/apache/pulsar/io/http/HttpSink.java |  2 +-
 .../org/apache/pulsar/io/http/HttpSinkConfig.java     |  8 ++++++++
 .../java/org/apache/pulsar/io/http/HttpSinkTest.java  | 19 ++++++++++++++++++-
 3 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java 
b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java
index 31b5053ba7a..c59fcaadfb1 100644
--- a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java
+++ b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSink.java
@@ -70,7 +70,7 @@ public class HttpSink implements Sink<GenericObject> {
         byte[] bytes = mapper.writeValueAsBytes(json);
         HttpRequest.Builder builder = HttpRequest.newBuilder()
             .uri(uri)
-            .POST(HttpRequest.BodyPublishers.ofByteArray(bytes));
+            .method(httpSinkConfig.getHttpMethod().name(), 
HttpRequest.BodyPublishers.ofByteArray(bytes));
         httpSinkConfig.getHeaders().forEach(builder::header);
         record.getProperties().forEach((k, v) -> 
builder.header("PulsarProperties-" + k, v));
         record.getTopicName().ifPresent(topic -> builder.header("PulsarTopic", 
topic));
diff --git 
a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java 
b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java
index 2d51e7412fc..a6dc6051d34 100644
--- a/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java
+++ b/pulsar-io/http/src/main/java/org/apache/pulsar/io/http/HttpSinkConfig.java
@@ -46,6 +46,14 @@ public class HttpSinkConfig implements Serializable {
         help = "The list of default headers added to each request")
     private Map<String, String> headers = new HashMap<>();
 
+    @FieldDoc(defaultValue = "POST",
+            help = "The HTTP method to use in the request,support POST/PUT")
+    private HttpMethod httpMethod = HttpMethod.POST;
+
+    public enum HttpMethod {
+        POST,
+        PUT
+    }
     public static HttpSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(yamlFile), HttpSinkConfig.class);
diff --git 
a/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java 
b/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
index 6ad8f45e731..7f09030b2f9 100644
--- a/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
+++ b/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
@@ -24,11 +24,14 @@ import static 
com.github.tomakehurst.wiremock.client.WireMock.equalTo;
 import static com.github.tomakehurst.wiremock.client.WireMock.equalToJson;
 import static com.github.tomakehurst.wiremock.client.WireMock.post;
 import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.put;
+import static com.github.tomakehurst.wiremock.client.WireMock.putRequestedFor;
 import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
 import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
 import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
 import static com.github.tomakehurst.wiremock.client.WireMock.verify;
 import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.matching.RequestPatternBuilder;
 import java.io.IOException;
 import java.sql.Time;
 import java.sql.Timestamp;
@@ -70,6 +73,8 @@ public class HttpSinkTest {
         configureFor(server.port());
         stubFor(post(urlPathEqualTo("/"))
             .willReturn(aResponse().withStatus(200)));
+        stubFor(put(urlPathEqualTo("/"))
+                .willReturn(aResponse().withStatus(200)));
     }
 
     @AfterClass
@@ -233,12 +238,19 @@ public class HttpSinkTest {
     }
 
     private void test(Schema<?> schema, GenericObject genericObject, String 
responseBody) throws Exception {
+        test(HttpSinkConfig.HttpMethod.PUT.name(), schema, genericObject, 
responseBody);
+        test(HttpSinkConfig.HttpMethod.POST.name(), schema, genericObject, 
responseBody);
+    }
+
+    private void test(String httpMethod, Schema<?> schema,
+                      GenericObject genericObject, String responseBody) throws 
Exception {
         HttpSink httpSink = new HttpSink();
         Map<String, Object> config = new HashMap<>();
         config.put("url", server.baseUrl());
         Map<String, String> headers = new HashMap<>();
         headers.put("header-name", "header-value");
         config.put("headers", headers);
+        config.put("httpMethod", httpMethod);
         httpSink.open(config, null);
 
         long now = 1662418008000L;
@@ -428,8 +440,13 @@ public class HttpSinkTest {
             }
         };
         httpSink.write(record);
+        RequestPatternBuilder requestPatternBuilder = switch (httpMethod) {
+            case "POST" -> postRequestedFor(urlEqualTo("/"));
+            case "PUT" -> putRequestedFor(urlEqualTo("/"));
+            default -> throw new IllegalArgumentException("UnSupport 
httpMethod: " + httpMethod);
+        };
 
-        verify(postRequestedFor(urlEqualTo("/"))
+        verify(requestPatternBuilder
             .withRequestBody(equalToJson(responseBody))
             .withHeader("Content-Type", equalTo("application/json"))
             .withHeader("header-name", equalTo("header-value"))

Reply via email to