This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new e8f7413 AMBARI-23077. LogFeeder: create socket input. (#2260)
e8f7413 is described below
commit e8f7413cb646d5f9062eb2d929ebc02810dc4d72
Author: Olivér Szabó <[email protected]>
AuthorDate: Fri Sep 7 10:46:09 2018 +0200
AMBARI-23077. LogFeeder: create socket input. (#2260)
* AMBARI-23077. LogFeeder: create socket input.
* AMBARI-23077. Throw exception if port is missing from input descriptor.
* AMBARI-23077. fix debug logs
* AMBARI-23077. Fixes based on the PR review
---
.../src/test/resources/log4j.properties | 9 +-
.../model/inputconfig/InputSocketDescriptor.java | 31 ++++
.../json/model/inputconfig/impl/InputAdapter.java | 2 +
.../impl/InputSocketDescriptorImpl.java | 111 ++++++++++++++
.../config/zookeeper/LogSearchConfigServerZK.java | 2 -
.../ambari/logfeeder/plugin/input/Input.java | 10 +-
.../apache/ambari/logfeeder/input/InputFile.java | 19 +--
.../apache/ambari/logfeeder/input/InputSocket.java | 167 +++++++++++++++++++++
.../ambari/logfeeder/input/InputSocketMarker.java | 71 +++++++++
.../apache/ambari/logfeeder/output/OutputSolr.java | 6 +-
.../src/main/resources/alias_config.json | 5 +-
.../shipper-conf/input.config-sample.json | 18 +++
.../model/common/LSServerInputConfig.java | 4 +
.../model/common/LSServerInputSocket.java | 81 ++++++++++
14 files changed, 515 insertions(+), 21 deletions(-)
diff --git
a/ambari-logsearch/ambari-logsearch-appender/src/test/resources/log4j.properties
b/ambari-logsearch/ambari-logsearch-appender/src/test/resources/log4j.properties
index 78fb66b..18ee520 100644
---
a/ambari-logsearch/ambari-logsearch-appender/src/test/resources/log4j.properties
+++
b/ambari-logsearch/ambari-logsearch-appender/src/test/resources/log4j.properties
@@ -13,7 +13,7 @@
# log4j configuration used during build and unit tests
# Root logger option
-log4j.rootLogger=INFO, logsearchJson
+log4j.rootLogger=INFO, logsearchJson, socket
# Redirect log messages to a logsearch json
log4j.appender.logsearchJson=org.apache.ambari.logsearch.appender.LogsearchRollingFileAppender
@@ -21,4 +21,9 @@ log4j.appender.logsearchJson.File=target/jsonlog/log.json
log4j.appender.logsearchJson.maxFileSize=10MB
log4j.appender.logsearchJson.maxBackupIndex=10
log4j.appender.logsearchJson.Append=true
-log4j.appender.logsearchJson.layout=org.apache.ambari.logsearch.appender.LogsearchConversion
\ No newline at end of file
+log4j.appender.logsearchJson.layout=org.apache.ambari.logsearch.appender.LogsearchConversion
+
+log4j.appender.socket=org.apache.log4j.net.SocketAppender
+log4j.appender.socket.Port=61999
+log4j.appender.socket.RemoteHost=localhost
+log4j.appender.socket.ReconnectionDelay=10000
\ No newline at end of file
diff --git
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputSocketDescriptor.java
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputSocketDescriptor.java
new file mode 100644
index 0000000..d89e9fc
--- /dev/null
+++
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputSocketDescriptor.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface InputSocketDescriptor extends InputDescriptor {
+
+ Integer getPort();
+
+ String getProtocol();
+
+ Boolean isSecure();
+
+ Boolean isLog4j();
+
+}
diff --git
a/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputAdapter.java
b/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputAdapter.java
index aa581e1..d0a4092 100644
---
a/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputAdapter.java
+++
b/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputAdapter.java
@@ -51,6 +51,8 @@ public class InputAdapter implements
JsonDeserializer<InputDescriptorImpl> {
return (InputDescriptorImpl)context.deserialize(json,
InputFileDescriptorImpl.class);
case "s3_file":
return (InputDescriptorImpl)context.deserialize(json,
InputS3FileDescriptorImpl.class);
+ case "socket":
+ return (InputDescriptorImpl)context.deserialize(json,
InputSocketDescriptorImpl.class);
case "custom":
return (InputDescriptorImpl)context.deserialize(json,
InputCustomDescriptorImpl.class);
default:
diff --git
a/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputSocketDescriptorImpl.java
b/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputSocketDescriptorImpl.java
new file mode 100644
index 0000000..69b0068
--- /dev/null
+++
b/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputSocketDescriptorImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logsearch.config.json.model.inputconfig.impl;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+import org.apache.ambari.logsearch.config.api.ShipperConfigElementDescription;
+import org.apache.ambari.logsearch.config.api.ShipperConfigTypeDescription;
+import
org.apache.ambari.logsearch.config.api.model.inputconfig.InputSocketDescriptor;
+
+@ShipperConfigTypeDescription(
+ name = "Socket Input",
+ description = "Socket (TCP/UDP) inputs have the following parameters in
addition to the general parameters:"
+)
+public class InputSocketDescriptorImpl extends InputDescriptorImpl implements
InputSocketDescriptor {
+
+ @ShipperConfigElementDescription(
+ path = "/input/[]/port",
+ type = "int",
+ description = "Unique port for specific socket input",
+ examples = {"61999"}
+ )
+ @Expose
+ @SerializedName("port")
+ private Integer port;
+
+ @ShipperConfigElementDescription(
+ path = "/input/[]/protocol",
+ type = "int",
+ description = "Protocol type for socket server (tcp / udp - udp is not
supported right now)",
+ examples = {"udp", "tcp"},
+ defaultValue = "tcp"
+ )
+ @Expose
+ @SerializedName("protocol")
+ private String protocol;
+
+ @ShipperConfigElementDescription(
+ path = "/input/[]/secure",
+ type = "boolean",
+ description = "Use SSL",
+ examples = {"true"},
+ defaultValue = "false"
+ )
+ @Expose
+ @SerializedName("secure")
+ private Boolean secure;
+
+ @ShipperConfigElementDescription(
+ path = "/input/[]/log4j",
+ type = "boolean",
+ description = "Use Log4j serialized objects (e.g.: SocketAppender)",
+ examples = {"true"},
+ defaultValue = "false"
+ )
+ @Expose
+ @SerializedName("log4j")
+ private Boolean log4j;
+
+ @Override
+ public Integer getPort() {
+ return this.port;
+ }
+
+ @Override
+ public String getProtocol() {
+ return this.protocol;
+ }
+
+ @Override
+ public Boolean isSecure() {
+ return this.secure;
+ }
+
+ @Override
+ public Boolean isLog4j() {
+ return this.log4j;
+ }
+
+ public void setPort(Integer port) {
+ this.port = port;
+ }
+
+ public void setProtocol(String protocol) {
+ this.protocol = protocol;
+ }
+
+ public void setSecure(Boolean secure) {
+ this.secure = secure;
+ }
+
+ public void setLog4j(Boolean log4j) {
+ this.log4j = log4j;
+ }
+}
diff --git
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
index 546e4d4..7380c6b 100644
---
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
+++
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
@@ -26,8 +26,6 @@ import java.util.TreeMap;
import org.apache.ambari.logsearch.config.api.LogLevelFilterManager;
import org.apache.ambari.logsearch.config.api.LogSearchConfigServer;
-import
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
-import
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
import
org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputAdapter;
import
org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigGson;
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
index ed0edcd..1b1fed5 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
@@ -38,11 +38,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-public abstract class Input<PROP_TYPE extends LogFeederProperties,
INPUT_MARKER extends InputMarker> extends ConfigItem<PROP_TYPE> implements
Runnable {
+public abstract class Input<PROP_TYPE extends LogFeederProperties,
INPUT_MARKER extends InputMarker, INPUT_DESC_TYPE extends InputDescriptor>
extends ConfigItem<PROP_TYPE> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Input.class);
- private InputDescriptor inputDescriptor;
+ private INPUT_DESC_TYPE inputDescriptor;
private PROP_TYPE logFeederProperties;
private LogSearchConfigLogFeeder logSearchConfig;
private InputManager inputManager;
@@ -60,7 +60,7 @@ public abstract class Input<PROP_TYPE extends
LogFeederProperties, INPUT_MARKER
private boolean initDefaultFields;
protected MetricData readBytesMetric = new
MetricData(getReadBytesMetricName(), false);
- public void loadConfigs(InputDescriptor inputDescriptor, PROP_TYPE
logFeederProperties,
+ public void loadConfigs(INPUT_DESC_TYPE inputDescriptor, PROP_TYPE
logFeederProperties,
InputManager inputManager, OutputManager
outputManager) {
this.inputDescriptor = inputDescriptor;
this.logFeederProperties = logFeederProperties;
@@ -94,7 +94,7 @@ public abstract class Input<PROP_TYPE extends
LogFeederProperties, INPUT_MARKER
return logFeederProperties;
}
- public InputDescriptor getInputDescriptor() {
+ public INPUT_DESC_TYPE getInputDescriptor() {
return inputDescriptor;
}
@@ -215,7 +215,7 @@ public abstract class Input<PROP_TYPE extends
LogFeederProperties, INPUT_MARKER
}
}
- public void loadConfig(InputDescriptor inputDescriptor) {
+ public void loadConfig(INPUT_DESC_TYPE inputDescriptor) {
this.inputDescriptor = inputDescriptor;
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 85df2ee..c31f404 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -44,7 +44,7 @@ import java.io.BufferedReader;
import java.io.File;
import java.util.*;
-public class InputFile extends Input<LogFeederProps, InputFileMarker> {
+public class InputFile extends Input<LogFeederProps, InputFileMarker,
InputFileBaseDescriptor> {
private static final Logger LOG = LoggerFactory.getLogger(InputFile.class);
@@ -223,18 +223,19 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
super.init(logFeederProps);
LOG.info("init() called");
+ InputFileDescriptor inputFileDescriptor = (InputFileDescriptor)
getInputDescriptor(); // cast as InputS3 uses InputFileBaseDescriptor
checkPointExtension = logFeederProps.getCheckPointExtension();
- checkPointIntervalMS = (int)
ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)getInputDescriptor()).getCheckpointIntervalMs(),
DEFAULT_CHECKPOINT_INTERVAL_MS);
- detachIntervalMin = (int)
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachIntervalMin(),
DEFAULT_DETACH_INTERVAL_MIN * 60);
- detachTimeMin = (int)
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachTimeMin(),
DEFAULT_DETACH_TIME_MIN * 60);
- pathUpdateIntervalMin = (int)
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getPathUpdateIntervalMin(),
DEFAULT_LOG_PATH_UPDATE_INTERVAL_MIN * 60);
- maxAgeMin = (int)
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getMaxAgeMin(),
0);
- boolean initDefaultFields =
BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().isInitDefaultFields(),
false);
+ checkPointIntervalMS = (int)
ObjectUtils.defaultIfNull(inputFileDescriptor.getCheckpointIntervalMs(),
DEFAULT_CHECKPOINT_INTERVAL_MS);
+ detachIntervalMin = (int)
ObjectUtils.defaultIfNull(inputFileDescriptor.getDetachIntervalMin(),
DEFAULT_DETACH_INTERVAL_MIN * 60);
+ detachTimeMin = (int)
ObjectUtils.defaultIfNull(inputFileDescriptor.getDetachTimeMin(),
DEFAULT_DETACH_TIME_MIN * 60);
+ pathUpdateIntervalMin = (int)
ObjectUtils.defaultIfNull(inputFileDescriptor.getPathUpdateIntervalMin(),
DEFAULT_LOG_PATH_UPDATE_INTERVAL_MIN * 60);
+ maxAgeMin = (int)
ObjectUtils.defaultIfNull(inputFileDescriptor.getMaxAgeMin(), 0);
+ boolean initDefaultFields =
BooleanUtils.toBooleanDefaultIfNull(inputFileDescriptor.isInitDefaultFields(),
false);
setInitDefaultFields(initDefaultFields);
// Let's close the file and set it to true after we start monitoring it
setClosed(true);
- dockerLog =
BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDockerEnabled(),
false);
+ dockerLog =
BooleanUtils.toBooleanDefaultIfNull(inputFileDescriptor.getDockerEnabled(),
false);
if (dockerLog) {
if (logFeederProps.isDockerContainerRegistryEnabled()) {
boolean isFileReady = isReady();
@@ -279,7 +280,7 @@ public class InputFile extends Input<LogFeederProps,
InputFileMarker> {
@Override
public void start() throws Exception {
- boolean isProcessFile =
BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)getInputDescriptor()).getProcessFile(),
true);
+ boolean isProcessFile =
BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().getProcessFile(),
true);
if (isProcessFile) {
for (int i = logFiles.length - 1; i >= 0; i--) {
File file = logFiles[i];
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java
new file mode 100644
index 0000000..36b4301
--- /dev/null
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logsearch.appender.LogsearchConversion;
+import
org.apache.ambari.logsearch.config.api.model.inputconfig.InputSocketDescriptor;
+import org.apache.commons.lang.ObjectUtils;
+import org.apache.log4j.spi.LoggingEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLServerSocketFactory;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.ObjectInputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+
+public class InputSocket extends Input<LogFeederProps, InputSocketMarker,
InputSocketDescriptor> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InputSocket.class);
+
+ private ServerSocket serverSocket;
+ private Thread thread;
+ private int port;
+ private String protocol;
+ private boolean secure;
+ private boolean log4j;
+
+ @Override
+ public void init(LogFeederProps logFeederProperties) throws Exception {
+ super.init(logFeederProperties);
+ port = (int) ObjectUtils.defaultIfNull(getInputDescriptor().getPort(), 0);
+ if (port == 0) {
+ throw new IllegalArgumentException(String.format("Port needs to be set
for socket input (type: %s)", getInputDescriptor().getType()));
+ }
+
+ protocol = (String)
ObjectUtils.defaultIfNull(getInputDescriptor().getProtocol(), "tcp");
+ secure = (boolean)
ObjectUtils.defaultIfNull(getInputDescriptor().isSecure(), false);
+ log4j = (boolean)
ObjectUtils.defaultIfNull(getInputDescriptor().isLog4j(), false);
+ }
+
+ @Override
+ public boolean monitor() {
+ if (isReady()) {
+ LOG.info("Start monitoring socket thread...");
+ thread = new Thread(this, getNameForThread());
+ thread.start();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ LOG.info("Starting socket server (port: {}, protocol: {}, secure: {})",
port, protocol, secure);
+ ServerSocketFactory socketFactory = secure ?
SSLServerSocketFactory.getDefault() : ServerSocketFactory.getDefault();
+ InputSocketMarker inputSocketMarker = new InputSocketMarker(this, port,
protocol, secure, log4j);
+ LogsearchConversion loggerConverter = new LogsearchConversion();
+
+ try {
+ serverSocket = socketFactory.createServerSocket(port);
+ while (!isDrain()) {
+ Socket socket = serverSocket.accept();
+ if (log4j) {
+ try (ObjectInputStream ois = new ObjectInputStream(new
BufferedInputStream(socket.getInputStream()))) {
+ LoggingEvent loggingEvent = (LoggingEvent) ois.readObject();
+ String jsonStr = loggerConverter.createOutput(loggingEvent);
+ LOG.trace("Incoming socket logging event: " + jsonStr);
+ outputLine(jsonStr, inputSocketMarker);
+ }
+ } else {
+ try (BufferedReader in = new BufferedReader(new
InputStreamReader(socket.getInputStream()));) {
+ String line = in.readLine();
+ LOG.trace("Incoming socket message: " + line);
+ outputLine(line, inputSocketMarker);
+ }
+ }
+ }
+ } catch (SocketException socketEx) {
+ LOG.warn("{}", socketEx.getMessage());
+ } finally {
+ serverSocket.close();
+ }
+ }
+
+ @Override
+ public void setDrain(boolean drain) {
+ super.setDrain(drain);
+ LOG.info("Stopping socket input: {}", getShortDescription());
+ try {
+ serverSocket.close();
+ setClosed(true);
+ } catch (Exception e) {
+ LOG.error("Error during closing socket input.", e);
+ }
+ }
+
+ @Override
+ public String getNameForThread() {
+ return String.format("socket=%s-%s-%s", getLogType(), this.protocol,
this.port);
+ }
+
+ @Override
+ public String getShortDescription() {
+ return String.format("%s - (port: %d, protocol: %s)", getLogType(), port,
protocol);
+ }
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public InputSocketMarker getInputMarker() {
+ return null;
+ }
+
+ @Override
+ public void setReady(boolean isReady) {
+ }
+
+ @Override
+ public void checkIn(InputSocketMarker inputMarker) {
+ }
+
+ @Override
+ public void lastCheckIn() {
+ }
+
+ @Override
+ public String getReadBytesMetricName() {
+ return null;
+ }
+
+ @Override
+ public String getStatMetricName() {
+ return null;
+ }
+
+ @Override
+ public boolean logConfigs() {
+ return false;
+ }
+}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocketMarker.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocketMarker.java
new file mode 100644
index 0000000..983cd19
--- /dev/null
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocketMarker.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input;
+
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class InputSocketMarker implements InputMarker<InputSocket>{
+ private final InputSocket input;
+ private final Integer port;
+ private final String protocol;
+ private final Boolean secure;
+ private final Boolean log4j;
+
+ private final Map<String, Object> properties = new HashMap<>();
+
+ public InputSocketMarker(InputSocket input, Integer port, String protocol,
Boolean secure, Boolean log4j) {
+ this.input = input;
+ this.port = port;
+ this.protocol = protocol;
+ this.secure = secure;
+ this.log4j = log4j;
+ properties.put("port", port);
+ properties.put("secure", secure);
+ properties.put("protocol", protocol);
+ properties.put("log4j", log4j);
+ }
+
+ public InputSocket getInput() {
+ return input;
+ }
+
+ @Override
+ public Map<String, Object> getAllProperties() {
+ return this.properties;
+ }
+
+ public Integer getPort() {
+ return port;
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public Boolean isSecure() {
+ return secure;
+ }
+
+ public Boolean isLog4j() {
+ return log4j;
+ }
+}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index 278a7f5..350986e 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -428,8 +428,10 @@ public class OutputSolr extends Output<LogFeederProps,
InputMarker> {
Level.ERROR);
}
}
-
latestInputMarkers.put(outputData.inputMarker.getAllProperties().get("file_key").toString(),
- outputData.inputMarker);
+ Object fileKey =
outputData.inputMarker.getAllProperties().get("file_key");
+ if (fileKey != null) {
+ latestInputMarkers.put(fileKey.toString(), outputData.inputMarker);
+ }
localBuffer.add(document);
}
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
index 3da5b0a..229a9b6 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
@@ -1,5 +1,5 @@
{
-"input": {
+ "input": {
"file": {
"klass": "org.apache.ambari.logfeeder.input.InputFile"
},
@@ -8,6 +8,9 @@
},
"simulate": {
"klass": "org.apache.ambari.logfeeder.input.InputSimulate"
+ },
+ "socket": {
+ "klass": "org.apache.ambari.logfeeder.input.InputSocket"
}
},
"filter": {
diff --git
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log-samples/shipper-conf/input.config-sample.json
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log-samples/shipper-conf/input.config-sample.json
index 3951e4b..690bb29 100644
---
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log-samples/shipper-conf/input.config-sample.json
+++
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log-samples/shipper-conf/input.config-sample.json
@@ -5,6 +5,14 @@
"rowtype": "service",
"path": "target/classes/log-samples/logs/service_sample.txt",
"group": "Ambari"
+ },
+ {
+ "type": "service_socket",
+ "rowtype": "service",
+ "port": 61999,
+ "protocol" : "tcp",
+ "source" : "socket",
+ "log4j": "true"
}
],
"filter": [
@@ -27,6 +35,16 @@
}
}
}
+ },
+ {
+ "filter": "json",
+ "conditions": {
+ "fields": {
+ "type": [
+ "service_socket"
+ ]
+ }
+ }
}
]
}
diff --git
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputConfig.java
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputConfig.java
index 81c4593..1c4939f 100644
---
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputConfig.java
+++
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputConfig.java
@@ -38,6 +38,7 @@ import
com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import
org.apache.ambari.logsearch.config.api.model.inputconfig.InputSocketDescriptor;
@ApiModel
public class LSServerInputConfig {
@@ -64,6 +65,9 @@ public class LSServerInputConfig {
} else if (inputDescriptor instanceof InputS3FileDescriptor) {
LSServerInput inputItem = new LSServerInputS3File(inputDescriptor);
input.add(inputItem);
+ } else if (inputDescriptor instanceof InputSocketDescriptor) {
+ LSServerInput inputItem = new LSServerInputSocket(inputDescriptor);
+ input.add(inputItem);
}
}
diff --git
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputSocket.java
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputSocket.java
new file mode 100644
index 0000000..efe0e3b
--- /dev/null
+++
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputSocket.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logsearch.model.common;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import
org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import
org.apache.ambari.logsearch.config.api.model.inputconfig.InputSocketDescriptor;
+
+@ApiModel
+public class LSServerInputSocket extends LSServerInput {
+
+ @JsonProperty("port")
+ private Integer port;
+
+ @JsonProperty("protocol")
+ private String protocol;
+
+ @JsonProperty("secure")
+ private Boolean secure;
+
+ @JsonProperty("log4j")
+ private Boolean log4j;
+
+ public LSServerInputSocket(InputDescriptor inputDescriptor) {
+ super(inputDescriptor);
+ InputSocketDescriptor inputSocketDescriptor = (InputSocketDescriptor)
inputDescriptor;
+ this.port = inputSocketDescriptor.getPort();
+ this.protocol = inputSocketDescriptor.getProtocol();
+ this.secure = inputSocketDescriptor.isSecure();
+ this.log4j = inputSocketDescriptor.isLog4j();
+ }
+
+ public Integer getPort() {
+ return port;
+ }
+
+ public void setPort(Integer port) {
+ this.port = port;
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public void setProtocol(String protocol) {
+ this.protocol = protocol;
+ }
+
+ public Boolean getSecure() {
+ return secure;
+ }
+
+ public void setSecure(Boolean secure) {
+ this.secure = secure;
+ }
+
+ public Boolean getLog4j() {
+ return log4j;
+ }
+
+ public void setLog4j(Boolean log4j) {
+ this.log4j = log4j;
+ }
+}