This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 58fb2d562be CAMEL-18369: camel-stream - Add http to read stream from 
remote http server
58fb2d562be is described below

commit 58fb2d562be96322188eefd895fad5def88a4eb9
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Aug 9 13:05:08 2022 +0200

    CAMEL-18369: camel-stream - Add http to read stream from remote http server
---
 .../component/stream/StreamEndpointConfigurer.java | 12 ++++
 .../component/stream/StreamEndpointUriFactory.java |  4 +-
 .../org/apache/camel/component/stream/stream.json  |  4 +-
 .../src/main/docs/stream-component.adoc            | 12 ++++
 .../camel/component/stream/StreamConsumer.java     | 76 ++++++++++++++++++++--
 .../camel/component/stream/StreamEndpoint.java     | 31 ++++++++-
 .../camel/component/stream/StreamProducer.java     |  2 +-
 7 files changed, 132 insertions(+), 9 deletions(-)

diff --git 
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
 
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
index 5a937b7daa0..cfce6d3cbe8 100644
--- 
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
+++ 
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
@@ -43,6 +43,10 @@ public class StreamEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "groupLines": target.setGroupLines(property(camelContext, 
int.class, value)); return true;
         case "groupstrategy":
         case "groupStrategy": target.setGroupStrategy(property(camelContext, 
org.apache.camel.component.stream.GroupStrategy.class, value)); return true;
+        case "httpheaders":
+        case "httpHeaders": target.setHttpHeaders(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "httpurl":
+        case "httpUrl": target.setHttpUrl(property(camelContext, 
java.lang.String.class, value)); return true;
         case "initialpromptdelay":
         case "initialPromptDelay": 
target.setInitialPromptDelay(property(camelContext, long.class, value)); return 
true;
         case "lazystartproducer":
@@ -89,6 +93,10 @@ public class StreamEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "groupLines": return int.class;
         case "groupstrategy":
         case "groupStrategy": return 
org.apache.camel.component.stream.GroupStrategy.class;
+        case "httpheaders":
+        case "httpHeaders": return java.lang.String.class;
+        case "httpurl":
+        case "httpUrl": return java.lang.String.class;
         case "initialpromptdelay":
         case "initialPromptDelay": return long.class;
         case "lazystartproducer":
@@ -136,6 +144,10 @@ public class StreamEndpointConfigurer extends 
PropertyConfigurerSupport implemen
         case "groupLines": return target.getGroupLines();
         case "groupstrategy":
         case "groupStrategy": return target.getGroupStrategy();
+        case "httpheaders":
+        case "httpHeaders": return target.getHttpHeaders();
+        case "httpurl":
+        case "httpUrl": return target.getHttpUrl();
         case "initialpromptdelay":
         case "initialPromptDelay": return target.getInitialPromptDelay();
         case "lazystartproducer":
diff --git 
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
 
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
index d16e85ed5a1..a503551d4cc 100644
--- 
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
+++ 
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class StreamEndpointUriFactory extends 
org.apache.camel.support.component
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(22);
+        Set<String> props = new HashSet<>(24);
         props.add("appendNewLine");
         props.add("autoCloseCount");
         props.add("bridgeErrorHandler");
@@ -34,6 +34,8 @@ public class StreamEndpointUriFactory extends 
org.apache.camel.support.component
         props.add("fileWatcher");
         props.add("groupLines");
         props.add("groupStrategy");
+        props.add("httpHeaders");
+        props.add("httpUrl");
         props.add("initialPromptDelay");
         props.add("kind");
         props.add("lazyStartProducer");
diff --git 
a/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
 
b/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
index 1bb541ffdc4..8364034a4ce 100644
--- 
a/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
+++ 
b/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
@@ -31,12 +31,14 @@
     "CamelStreamComplete": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "boolean", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Is complete", "constantName": 
"org.apache.camel.component.stream.StreamConstants#STREAM_COMPLETE" }
   },
   "properties": {
-    "kind": { "kind": "path", "displayName": "Kind", "group": "common", 
"label": "", "required": true, "type": "string", "javaType": 
"java.lang.String", "enum": [ "in", "out", "err", "header", "file" ], 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Kind of stream to use such as System.in or System.out." 
},
+    "kind": { "kind": "path", "displayName": "Kind", "group": "common", 
"label": "", "required": true, "type": "string", "javaType": 
"java.lang.String", "enum": [ "in", "out", "err", "header", "file", "http" ], 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "Kind of stream to use such as System.in, System.out, a 
file, or a http url." },
     "encoding": { "kind": "parameter", "displayName": "Encoding", "group": 
"common", "label": "", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "You can configure the encoding (is a charset name) to use 
text-based streams (for example, message body is a String object). If not 
provided, Camel uses the JVM default Charset." },
     "fileName": { "kind": "parameter", "displayName": "File Name", "group": 
"common", "label": "", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "When using the stream:file URI format, this option specifies 
the filename to stream to\/from." },
     "fileWatcher": { "kind": "parameter", "displayName": "File Watcher", 
"group": "consumer", "label": "consumer", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "description": "To use JVM file watcher to listen 
for file change events to support re-loading files that may be overwritten, 
somewhat like tail --retry" },
     "groupLines": { "kind": "parameter", "displayName": "Group Lines", 
"group": "consumer", "label": "consumer", "required": false, "type": "integer", 
"javaType": "int", "deprecated": false, "autowired": false, "secret": false, 
"description": "To group X number of lines in the consumer. For example to 
group 10 lines and therefore only spit out an Exchange with 10 lines, instead 
of 1 Exchange per line." },
     "groupStrategy": { "kind": "parameter", "displayName": "Group Strategy", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "org.apache.camel.component.stream.GroupStrategy", "deprecated": 
false, "autowired": false, "secret": false, "description": "Allows to use a 
custom GroupStrategy to control how to group lines." },
+    "httpHeaders": { "kind": "parameter", "displayName": "Http Headers", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "When using stream:http format, this option 
specifies optional http headers, such as Accept: application\/json. Multiple 
headers can be separated by comma." },
+    "httpUrl": { "kind": "parameter", "displayName": "Http Url", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "When using stream:http format, this option 
specifies the http url to stream from." },
     "initialPromptDelay": { "kind": "parameter", "displayName": "Initial 
Prompt Delay", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 2000, "description": "Initial delay in 
milliseconds before showing the message prompt. This delay occurs only once. 
Can be used during system startup to avoid message prompts being written while 
other logging is done to the system [...]
     "promptDelay": { "kind": "parameter", "displayName": "Prompt Delay", 
"group": "consumer", "label": "consumer", "required": false, "type": "integer", 
"javaType": "long", "deprecated": false, "autowired": false, "secret": false, 
"description": "Optional delay in milliseconds before showing the message 
prompt." },
     "promptMessage": { "kind": "parameter", "displayName": "Prompt Message", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "description": "Message prompt to use when reading from 
stream:in; for example, you could set this to Enter a command:" },
diff --git a/components/camel-stream/src/main/docs/stream-component.adoc 
b/components/camel-stream/src/main/docs/stream-component.adoc
index 65bd0ba4e27..2bcb7554397 100644
--- a/components/camel-stream/src/main/docs/stream-component.adoc
+++ b/components/camel-stream/src/main/docs/stream-component.adoc
@@ -37,6 +37,7 @@ stream:out[?options]
 stream:err[?options]
 stream:header[?options]
 stream:file?fileName=/foo/bar.txt
+stream:http?httpUrl=http:myserver:8080/data
 -----------------------
 
 If the `stream:header` URI is specified, the `stream` header is used to
@@ -113,5 +114,16 @@ 
from("stream:file?fileName=/server/logs/server.log&scanStream=true&scanStreamDel
   .to("bean:logService?method=parseLogLine");
 ----
 
+== Reading HTTP server side streaming
+
+The camel-stream component has basic support for connecting to a remote HTTP 
server and read streaming data
+(chunk of data separated by new-line).
+
+[source,java]
+----
+from("stream:http?scanStream=true&httpUrl=http://localhost:8500";)
+  .to("log:input");
+----
+
 
 include::spring-boot:partial$starter.adoc[]
diff --git 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
index db40aed0402..c49b43717a7 100644
--- 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
+++ 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
@@ -22,6 +22,9 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -46,7 +49,7 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StreamConsumer.class);
 
-    private static final String TYPES = "in,file";
+    private static final String TYPES = "in,file,http";
     private static final String INVALID_URI = "Invalid uri, valid form: 
'stream:{" + TYPES + "}'";
     private static final List<String> TYPES_LIST = 
Arrays.asList(TYPES.split(","));
     private ExecutorService executor;
@@ -54,6 +57,7 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
     private volatile boolean watchFileChanged;
     private volatile InputStream inputStream = System.in;
     private volatile InputStream inputStreamToClose;
+    private volatile URLConnection urlConnectionToClose;
     private volatile File file;
     private StreamEndpoint endpoint;
     private String uri;
@@ -111,8 +115,11 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
         ServiceHelper.stopAndShutdownService(fileWatcher);
         lines.clear();
 
-        // do not close regular inputStream as it may be System.in etc.
         IOHelper.close(inputStreamToClose);
+        if (urlConnectionToClose != null) {
+            closeURLConnection(urlConnectionToClose);
+            urlConnectionToClose = null;
+        }
         super.doStop();
     }
 
@@ -134,6 +141,9 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
     private BufferedReader initializeStreamLineMode() throws Exception {
         // close old stream, before obtaining a new stream
         IOHelper.close(inputStreamToClose);
+        if (urlConnectionToClose != null) {
+            closeURLConnection(urlConnectionToClose);
+        }
 
         if ("in".equals(uri)) {
             inputStream = System.in;
@@ -141,11 +151,19 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
         } else if ("file".equals(uri)) {
             inputStream = resolveStreamFromFile();
             inputStreamToClose = inputStream;
+        } else if ("http".equals(uri)) {
+            inputStream = resolveStreamFromUrl();
+            inputStreamToClose = inputStream;
         }
 
         if (inputStream != null) {
-            Charset charset = endpoint.getCharset();
-            return IOHelper.buffered(new InputStreamReader(inputStream, 
charset));
+            if ("http".equals(uri)) {
+                // read as-is
+                return IOHelper.buffered(new InputStreamReader(inputStream));
+            } else {
+                Charset charset = endpoint.getCharset();
+                return IOHelper.buffered(new InputStreamReader(inputStream, 
charset));
+            }
         } else {
             return null;
         }
@@ -154,13 +172,20 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
     private InputStream initializeStreamRawMode() throws Exception {
         // close old stream, before obtaining a new stream
         IOHelper.close(inputStreamToClose);
+        if (urlConnectionToClose != null) {
+            closeURLConnection(urlConnectionToClose);
+        }
 
         if ("in".equals(uri)) {
             inputStream = System.in;
+            // do not close regular inputStream as it may be System.in etc.
             inputStreamToClose = null;
         } else if ("file".equals(uri)) {
             inputStream = resolveStreamFromFile();
             inputStreamToClose = inputStream;
+        } else if ("http".equals(uri)) {
+            inputStream = resolveStreamFromUrl();
+            inputStreamToClose = inputStream;
         }
 
         return inputStream;
@@ -405,6 +430,39 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
         return fileStream;
     }
 
+    private InputStream resolveStreamFromUrl() throws IOException {
+        String url = endpoint.getHttpUrl();
+        StringHelper.notEmpty(url, "httpUrl");
+
+        urlConnectionToClose = new URL(url).openConnection();
+        urlConnectionToClose.setUseCaches(false);
+        String headers = endpoint.getHttpHeaders();
+        if (headers != null) {
+            for (String h : headers.split(",")) {
+                String k = StringHelper.before(h, "=");
+                String v = StringHelper.after(h, "=");
+                if (k != null && v != null) {
+                    urlConnectionToClose.setRequestProperty(k, v);
+                }
+            }
+        }
+
+        InputStream is;
+
+        try {
+            is = urlConnectionToClose.getInputStream();
+        } catch (IOException e) {
+            // close the http connection to avoid
+            // leaking gaps in case of an exception
+            if (urlConnectionToClose instanceof HttpURLConnection) {
+                ((HttpURLConnection) urlConnectionToClose).disconnect();
+            }
+            throw e;
+        }
+
+        return is;
+    }
+
     private void validateUri(String uri) throws IllegalArgumentException {
         String[] s = uri.split(":");
         if (s.length < 2) {
@@ -434,4 +492,14 @@ public class StreamConsumer extends DefaultConsumer 
implements Runnable {
         return exchange;
     }
 
+    private static void closeURLConnection(URLConnection con) {
+        if (con instanceof HttpURLConnection) {
+            try {
+                ((HttpURLConnection) con).disconnect();
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+    }
+
 }
diff --git 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
index 8ca1b54341d..7e27e2e0860 100644
--- 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
+++ 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
@@ -42,12 +42,16 @@ public class StreamEndpoint extends DefaultEndpoint {
 
     private transient Charset charset;
 
-    @UriPath(enums = "in,out,err,header,file")
+    @UriPath(enums = "in,out,err,header,file,http")
     @Metadata(required = true)
     private String kind;
     @UriParam
     private String fileName;
     @UriParam(label = "consumer")
+    private String httpUrl;
+    @UriParam(label = "consumer")
+    private String httpHeaders;
+    @UriParam(label = "consumer")
     private boolean scanStream;
     @UriParam(label = "consumer")
     private boolean retry;
@@ -107,7 +111,7 @@ public class StreamEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Kind of stream to use such as System.in or System.out.
+     * Kind of stream to use such as System.in, System.out, a file, or a http 
url.
      */
     public void setKind(String kind) {
         this.kind = kind;
@@ -124,6 +128,29 @@ public class StreamEndpoint extends DefaultEndpoint {
         this.fileName = fileName;
     }
 
+    public String getHttpUrl() {
+        return httpUrl;
+    }
+
+    /**
+     * When using stream:http format, this option specifies the http url to 
stream from.
+     */
+    public void setHttpUrl(String httpUrl) {
+        this.httpUrl = httpUrl;
+    }
+
+    public String getHttpHeaders() {
+        return httpHeaders;
+    }
+
+    /**
+     * When using stream:http format, this option specifies optional http 
headers, such as Accept: application/json.
+     * Multiple headers can be separated by comma.
+     */
+    public void setHttpHeaders(String httpHeaders) {
+        this.httpHeaders = httpHeaders;
+    }
+
     public long getDelay() {
         return delay;
     }
diff --git 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
index c52dc8250b3..5c5ed1cdafe 100644
--- 
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
+++ 
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
@@ -44,7 +44,7 @@ public class StreamProducer extends DefaultProducer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StreamProducer.class);
 
-    private static final String TYPES = "out,err,file,header,url";
+    private static final String TYPES = "out,err,file,header";
     private static final String INVALID_URI = "Invalid uri, valid form: 
'stream:{" + TYPES + "}'";
     private static final List<String> TYPES_LIST = 
Arrays.asList(TYPES.split(","));
     private StreamEndpoint endpoint;

Reply via email to