This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 58fb2d562be CAMEL-18369: camel-stream - Add http to read stream from
remote http server
58fb2d562be is described below
commit 58fb2d562be96322188eefd895fad5def88a4eb9
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Aug 9 13:05:08 2022 +0200
CAMEL-18369: camel-stream - Add http to read stream from remote http server
---
.../component/stream/StreamEndpointConfigurer.java | 12 ++++
.../component/stream/StreamEndpointUriFactory.java | 4 +-
.../org/apache/camel/component/stream/stream.json | 4 +-
.../src/main/docs/stream-component.adoc | 12 ++++
.../camel/component/stream/StreamConsumer.java | 76 ++++++++++++++++++++--
.../camel/component/stream/StreamEndpoint.java | 31 ++++++++-
.../camel/component/stream/StreamProducer.java | 2 +-
7 files changed, 132 insertions(+), 9 deletions(-)
diff --git
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
index 5a937b7daa0..cfce6d3cbe8 100644
---
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
+++
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointConfigurer.java
@@ -43,6 +43,10 @@ public class StreamEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "groupLines": target.setGroupLines(property(camelContext,
int.class, value)); return true;
case "groupstrategy":
case "groupStrategy": target.setGroupStrategy(property(camelContext,
org.apache.camel.component.stream.GroupStrategy.class, value)); return true;
+ case "httpheaders":
+ case "httpHeaders": target.setHttpHeaders(property(camelContext,
java.lang.String.class, value)); return true;
+ case "httpurl":
+ case "httpUrl": target.setHttpUrl(property(camelContext,
java.lang.String.class, value)); return true;
case "initialpromptdelay":
case "initialPromptDelay":
target.setInitialPromptDelay(property(camelContext, long.class, value)); return
true;
case "lazystartproducer":
@@ -89,6 +93,10 @@ public class StreamEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "groupLines": return int.class;
case "groupstrategy":
case "groupStrategy": return
org.apache.camel.component.stream.GroupStrategy.class;
+ case "httpheaders":
+ case "httpHeaders": return java.lang.String.class;
+ case "httpurl":
+ case "httpUrl": return java.lang.String.class;
case "initialpromptdelay":
case "initialPromptDelay": return long.class;
case "lazystartproducer":
@@ -136,6 +144,10 @@ public class StreamEndpointConfigurer extends
PropertyConfigurerSupport implemen
case "groupLines": return target.getGroupLines();
case "groupstrategy":
case "groupStrategy": return target.getGroupStrategy();
+ case "httpheaders":
+ case "httpHeaders": return target.getHttpHeaders();
+ case "httpurl":
+ case "httpUrl": return target.getHttpUrl();
case "initialpromptdelay":
case "initialPromptDelay": return target.getInitialPromptDelay();
case "lazystartproducer":
diff --git
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
index d16e85ed5a1..a503551d4cc 100644
---
a/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
+++
b/components/camel-stream/src/generated/java/org/apache/camel/component/stream/StreamEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class StreamEndpointUriFactory extends
org.apache.camel.support.component
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(22);
+ Set<String> props = new HashSet<>(24);
props.add("appendNewLine");
props.add("autoCloseCount");
props.add("bridgeErrorHandler");
@@ -34,6 +34,8 @@ public class StreamEndpointUriFactory extends
org.apache.camel.support.component
props.add("fileWatcher");
props.add("groupLines");
props.add("groupStrategy");
+ props.add("httpHeaders");
+ props.add("httpUrl");
props.add("initialPromptDelay");
props.add("kind");
props.add("lazyStartProducer");
diff --git
a/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
b/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
index 1bb541ffdc4..8364034a4ce 100644
---
a/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
+++
b/components/camel-stream/src/generated/resources/org/apache/camel/component/stream/stream.json
@@ -31,12 +31,14 @@
"CamelStreamComplete": { "kind": "header", "displayName": "", "group":
"consumer", "label": "consumer", "required": false, "javaType": "boolean",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Is complete", "constantName":
"org.apache.camel.component.stream.StreamConstants#STREAM_COMPLETE" }
},
"properties": {
- "kind": { "kind": "path", "displayName": "Kind", "group": "common",
"label": "", "required": true, "type": "string", "javaType":
"java.lang.String", "enum": [ "in", "out", "err", "header", "file" ],
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Kind of stream to use such as System.in or System.out."
},
+ "kind": { "kind": "path", "displayName": "Kind", "group": "common",
"label": "", "required": true, "type": "string", "javaType":
"java.lang.String", "enum": [ "in", "out", "err", "header", "file", "http" ],
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "description": "Kind of stream to use such as System.in, System.out, a
file, or a http url." },
"encoding": { "kind": "parameter", "displayName": "Encoding", "group":
"common", "label": "", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "You can configure the encoding (is a charset name) to use
text-based streams (for example, message body is a String object). If not
provided, Camel uses the JVM default Charset." },
"fileName": { "kind": "parameter", "displayName": "File Name", "group":
"common", "label": "", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "When using the stream:file URI format, this option specifies
the filename to stream to\/from." },
"fileWatcher": { "kind": "parameter", "displayName": "File Watcher",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "description": "To use JVM file watcher to listen
for file change events to support re-loading files that may be overwritten,
somewhat like tail --retry" },
"groupLines": { "kind": "parameter", "displayName": "Group Lines",
"group": "consumer", "label": "consumer", "required": false, "type": "integer",
"javaType": "int", "deprecated": false, "autowired": false, "secret": false,
"description": "To group X number of lines in the consumer. For example to
group 10 lines and therefore only spit out an Exchange with 10 lines, instead
of 1 Exchange per line." },
"groupStrategy": { "kind": "parameter", "displayName": "Group Strategy",
"group": "consumer", "label": "consumer", "required": false, "type": "object",
"javaType": "org.apache.camel.component.stream.GroupStrategy", "deprecated":
false, "autowired": false, "secret": false, "description": "Allows to use a
custom GroupStrategy to control how to group lines." },
+ "httpHeaders": { "kind": "parameter", "displayName": "Http Headers",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "When using stream:http format, this option
specifies optional http headers, such as Accept: application\/json. Multiple
headers can be separated by comma." },
+ "httpUrl": { "kind": "parameter", "displayName": "Http Url", "group":
"consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "When using stream:http format, this option
specifies the http url to stream from." },
"initialPromptDelay": { "kind": "parameter", "displayName": "Initial
Prompt Delay", "group": "consumer", "label": "consumer", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 2000, "description": "Initial delay in
milliseconds before showing the message prompt. This delay occurs only once.
Can be used during system startup to avoid message prompts being written while
other logging is done to the system [...]
"promptDelay": { "kind": "parameter", "displayName": "Prompt Delay",
"group": "consumer", "label": "consumer", "required": false, "type": "integer",
"javaType": "long", "deprecated": false, "autowired": false, "secret": false,
"description": "Optional delay in milliseconds before showing the message
prompt." },
"promptMessage": { "kind": "parameter", "displayName": "Prompt Message",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "description": "Message prompt to use when reading from
stream:in; for example, you could set this to Enter a command:" },
diff --git a/components/camel-stream/src/main/docs/stream-component.adoc
b/components/camel-stream/src/main/docs/stream-component.adoc
index 65bd0ba4e27..2bcb7554397 100644
--- a/components/camel-stream/src/main/docs/stream-component.adoc
+++ b/components/camel-stream/src/main/docs/stream-component.adoc
@@ -37,6 +37,7 @@ stream:out[?options]
stream:err[?options]
stream:header[?options]
stream:file?fileName=/foo/bar.txt
+stream:http?httpUrl=http:myserver:8080/data
-----------------------
If the `stream:header` URI is specified, the `stream` header is used to
@@ -113,5 +114,16 @@
from("stream:file?fileName=/server/logs/server.log&scanStream=true&scanStreamDel
.to("bean:logService?method=parseLogLine");
----
+== Reading HTTP server side streaming
+
+The camel-stream component has basic support for connecting to a remote HTTP
server and read streaming data
+(chunk of data separated by new-line).
+
+[source,java]
+----
+from("stream:http?scanStream=true&httpUrl=http://localhost:8500")
+ .to("log:input");
+----
+
include::spring-boot:partial$starter.adoc[]
diff --git
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
index db40aed0402..c49b43717a7 100644
---
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
+++
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
@@ -22,6 +22,9 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
@@ -46,7 +49,7 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(StreamConsumer.class);
- private static final String TYPES = "in,file";
+ private static final String TYPES = "in,file,http";
private static final String INVALID_URI = "Invalid uri, valid form:
'stream:{" + TYPES + "}'";
private static final List<String> TYPES_LIST =
Arrays.asList(TYPES.split(","));
private ExecutorService executor;
@@ -54,6 +57,7 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
private volatile boolean watchFileChanged;
private volatile InputStream inputStream = System.in;
private volatile InputStream inputStreamToClose;
+ private volatile URLConnection urlConnectionToClose;
private volatile File file;
private StreamEndpoint endpoint;
private String uri;
@@ -111,8 +115,11 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
ServiceHelper.stopAndShutdownService(fileWatcher);
lines.clear();
- // do not close regular inputStream as it may be System.in etc.
IOHelper.close(inputStreamToClose);
+ if (urlConnectionToClose != null) {
+ closeURLConnection(urlConnectionToClose);
+ urlConnectionToClose = null;
+ }
super.doStop();
}
@@ -134,6 +141,9 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
private BufferedReader initializeStreamLineMode() throws Exception {
// close old stream, before obtaining a new stream
IOHelper.close(inputStreamToClose);
+ if (urlConnectionToClose != null) {
+ closeURLConnection(urlConnectionToClose);
+ }
if ("in".equals(uri)) {
inputStream = System.in;
@@ -141,11 +151,19 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
} else if ("file".equals(uri)) {
inputStream = resolveStreamFromFile();
inputStreamToClose = inputStream;
+ } else if ("http".equals(uri)) {
+ inputStream = resolveStreamFromUrl();
+ inputStreamToClose = inputStream;
}
if (inputStream != null) {
- Charset charset = endpoint.getCharset();
- return IOHelper.buffered(new InputStreamReader(inputStream,
charset));
+ if ("http".equals(uri)) {
+ // read as-is
+ return IOHelper.buffered(new InputStreamReader(inputStream));
+ } else {
+ Charset charset = endpoint.getCharset();
+ return IOHelper.buffered(new InputStreamReader(inputStream,
charset));
+ }
} else {
return null;
}
@@ -154,13 +172,20 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
private InputStream initializeStreamRawMode() throws Exception {
// close old stream, before obtaining a new stream
IOHelper.close(inputStreamToClose);
+ if (urlConnectionToClose != null) {
+ closeURLConnection(urlConnectionToClose);
+ }
if ("in".equals(uri)) {
inputStream = System.in;
+ // do not close regular inputStream as it may be System.in etc.
inputStreamToClose = null;
} else if ("file".equals(uri)) {
inputStream = resolveStreamFromFile();
inputStreamToClose = inputStream;
+ } else if ("http".equals(uri)) {
+ inputStream = resolveStreamFromUrl();
+ inputStreamToClose = inputStream;
}
return inputStream;
@@ -405,6 +430,39 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
return fileStream;
}
+ private InputStream resolveStreamFromUrl() throws IOException {
+ String url = endpoint.getHttpUrl();
+ StringHelper.notEmpty(url, "httpUrl");
+
+ urlConnectionToClose = new URL(url).openConnection();
+ urlConnectionToClose.setUseCaches(false);
+ String headers = endpoint.getHttpHeaders();
+ if (headers != null) {
+ for (String h : headers.split(",")) {
+ String k = StringHelper.before(h, "=");
+ String v = StringHelper.after(h, "=");
+ if (k != null && v != null) {
+ urlConnectionToClose.setRequestProperty(k, v);
+ }
+ }
+ }
+
+ InputStream is;
+
+ try {
+ is = urlConnectionToClose.getInputStream();
+ } catch (IOException e) {
+ // close the http connection to avoid
+ // leaking gaps in case of an exception
+ if (urlConnectionToClose instanceof HttpURLConnection) {
+ ((HttpURLConnection) urlConnectionToClose).disconnect();
+ }
+ throw e;
+ }
+
+ return is;
+ }
+
private void validateUri(String uri) throws IllegalArgumentException {
String[] s = uri.split(":");
if (s.length < 2) {
@@ -434,4 +492,14 @@ public class StreamConsumer extends DefaultConsumer
implements Runnable {
return exchange;
}
+ private static void closeURLConnection(URLConnection con) {
+ if (con instanceof HttpURLConnection) {
+ try {
+ ((HttpURLConnection) con).disconnect();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+
}
diff --git
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
index 8ca1b54341d..7e27e2e0860 100644
---
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
+++
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
@@ -42,12 +42,16 @@ public class StreamEndpoint extends DefaultEndpoint {
private transient Charset charset;
- @UriPath(enums = "in,out,err,header,file")
+ @UriPath(enums = "in,out,err,header,file,http")
@Metadata(required = true)
private String kind;
@UriParam
private String fileName;
@UriParam(label = "consumer")
+ private String httpUrl;
+ @UriParam(label = "consumer")
+ private String httpHeaders;
+ @UriParam(label = "consumer")
private boolean scanStream;
@UriParam(label = "consumer")
private boolean retry;
@@ -107,7 +111,7 @@ public class StreamEndpoint extends DefaultEndpoint {
}
/**
- * Kind of stream to use such as System.in or System.out.
+ * Kind of stream to use such as System.in, System.out, a file, or a http
url.
*/
public void setKind(String kind) {
this.kind = kind;
@@ -124,6 +128,29 @@ public class StreamEndpoint extends DefaultEndpoint {
this.fileName = fileName;
}
+ public String getHttpUrl() {
+ return httpUrl;
+ }
+
+ /**
+ * When using stream:http format, this option specifies the http url to
stream from.
+ */
+ public void setHttpUrl(String httpUrl) {
+ this.httpUrl = httpUrl;
+ }
+
+ public String getHttpHeaders() {
+ return httpHeaders;
+ }
+
+ /**
+ * When using stream:http format, this option specifies optional http
headers, such as Accept: application/json.
+ * Multiple headers can be separated by comma.
+ */
+ public void setHttpHeaders(String httpHeaders) {
+ this.httpHeaders = httpHeaders;
+ }
+
public long getDelay() {
return delay;
}
diff --git
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
index c52dc8250b3..5c5ed1cdafe 100644
---
a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
+++
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
@@ -44,7 +44,7 @@ public class StreamProducer extends DefaultProducer {
private static final Logger LOG =
LoggerFactory.getLogger(StreamProducer.class);
- private static final String TYPES = "out,err,file,header,url";
+ private static final String TYPES = "out,err,file,header";
private static final String INVALID_URI = "Invalid uri, valid form:
'stream:{" + TYPES + "}'";
private static final List<String> TYPES_LIST =
Arrays.asList(TYPES.split(","));
private StreamEndpoint endpoint;