This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e8f7413  AMBARI-23077. LogFeeder: create socket input. (#2260)
e8f7413 is described below

commit e8f7413cb646d5f9062eb2d929ebc02810dc4d72
Author: Olivér Szabó <[email protected]>
AuthorDate: Fri Sep 7 10:46:09 2018 +0200

    AMBARI-23077. LogFeeder: create socket input. (#2260)
    
    * AMBARI-23077. LogFeeder: create socket input.
    
    * AMBARI-23077. Throw exception if port is missing from input descriptor.
    
    * AMBARI-23077. fix debug logs
    
    * AMBARI-23077. Fixes based on the PR review
---
 .../src/test/resources/log4j.properties            |   9 +-
 .../model/inputconfig/InputSocketDescriptor.java   |  31 ++++
 .../json/model/inputconfig/impl/InputAdapter.java  |   2 +
 .../impl/InputSocketDescriptorImpl.java            | 111 ++++++++++++++
 .../config/zookeeper/LogSearchConfigServerZK.java  |   2 -
 .../ambari/logfeeder/plugin/input/Input.java       |  10 +-
 .../apache/ambari/logfeeder/input/InputFile.java   |  19 +--
 .../apache/ambari/logfeeder/input/InputSocket.java | 167 +++++++++++++++++++++
 .../ambari/logfeeder/input/InputSocketMarker.java  |  71 +++++++++
 .../apache/ambari/logfeeder/output/OutputSolr.java |   6 +-
 .../src/main/resources/alias_config.json           |   5 +-
 .../shipper-conf/input.config-sample.json          |  18 +++
 .../model/common/LSServerInputConfig.java          |   4 +
 .../model/common/LSServerInputSocket.java          |  81 ++++++++++
 14 files changed, 515 insertions(+), 21 deletions(-)

diff --git 
a/ambari-logsearch/ambari-logsearch-appender/src/test/resources/log4j.properties
 
b/ambari-logsearch/ambari-logsearch-appender/src/test/resources/log4j.properties
index 78fb66b..18ee520 100644
--- 
a/ambari-logsearch/ambari-logsearch-appender/src/test/resources/log4j.properties
+++ 
b/ambari-logsearch/ambari-logsearch-appender/src/test/resources/log4j.properties
@@ -13,7 +13,7 @@
 # log4j configuration used during build and unit tests
 
 # Root logger option
-log4j.rootLogger=INFO, logsearchJson
+log4j.rootLogger=INFO, logsearchJson, socket
 
 # Redirect log messages to a logsearch json 
 
log4j.appender.logsearchJson=org.apache.ambari.logsearch.appender.LogsearchRollingFileAppender
@@ -21,4 +21,9 @@ log4j.appender.logsearchJson.File=target/jsonlog/log.json
 log4j.appender.logsearchJson.maxFileSize=10MB
 log4j.appender.logsearchJson.maxBackupIndex=10
 log4j.appender.logsearchJson.Append=true
-log4j.appender.logsearchJson.layout=org.apache.ambari.logsearch.appender.LogsearchConversion
\ No newline at end of file
+log4j.appender.logsearchJson.layout=org.apache.ambari.logsearch.appender.LogsearchConversion
+
+log4j.appender.socket=org.apache.log4j.net.SocketAppender
+log4j.appender.socket.Port=61999
+log4j.appender.socket.RemoteHost=localhost
+log4j.appender.socket.ReconnectionDelay=10000
\ No newline at end of file
diff --git 
a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputSocketDescriptor.java
 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputSocketDescriptor.java
new file mode 100644
index 0000000..d89e9fc
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputSocketDescriptor.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logsearch.config.api.model.inputconfig;
+
+public interface InputSocketDescriptor extends InputDescriptor {
+
+  Integer getPort();
+
+  String getProtocol();
+
+  Boolean isSecure();
+
+  Boolean isLog4j();
+
+}
diff --git 
a/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputAdapter.java
 
b/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputAdapter.java
index aa581e1..d0a4092 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputAdapter.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputAdapter.java
@@ -51,6 +51,8 @@ public class InputAdapter implements 
JsonDeserializer<InputDescriptorImpl> {
         return (InputDescriptorImpl)context.deserialize(json, 
InputFileDescriptorImpl.class);
       case "s3_file":
         return (InputDescriptorImpl)context.deserialize(json, 
InputS3FileDescriptorImpl.class);
+      case "socket":
+        return (InputDescriptorImpl)context.deserialize(json, 
InputSocketDescriptorImpl.class);
       case "custom":
         return (InputDescriptorImpl)context.deserialize(json, 
InputCustomDescriptorImpl.class);
         default:
diff --git 
a/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputSocketDescriptorImpl.java
 
b/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputSocketDescriptorImpl.java
new file mode 100644
index 0000000..69b0068
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-config-json/src/main/java/org/apache/ambari/logsearch/config/json/model/inputconfig/impl/InputSocketDescriptorImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logsearch.config.json.model.inputconfig.impl;
+
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+import org.apache.ambari.logsearch.config.api.ShipperConfigElementDescription;
+import org.apache.ambari.logsearch.config.api.ShipperConfigTypeDescription;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputSocketDescriptor;
+
+@ShipperConfigTypeDescription(
+  name = "Socket Input",
+  description = "Socket (TCP/UDP) inputs have the following parameters in 
addition to the general parameters:"
+)
+public class InputSocketDescriptorImpl extends InputDescriptorImpl implements 
InputSocketDescriptor {
+
+  @ShipperConfigElementDescription(
+    path = "/input/[]/port",
+    type = "int",
+    description = "Unique port for specific socket input",
+    examples = {"61999"}
+  )
+  @Expose
+  @SerializedName("port")
+  private Integer port;
+
+  @ShipperConfigElementDescription(
+    path = "/input/[]/protocol",
+    type = "int",
+    description = "Protocol type for socket server (tcp / udp - udp is not 
supported right now)",
+    examples = {"udp", "tcp"},
+    defaultValue = "tcp"
+  )
+  @Expose
+  @SerializedName("protocol")
+  private String protocol;
+
+  @ShipperConfigElementDescription(
+    path = "/input/[]/secure",
+    type = "boolean",
+    description = "Use SSL",
+    examples = {"true"},
+    defaultValue = "false"
+  )
+  @Expose
+  @SerializedName("secure")
+  private Boolean secure;
+
+  @ShipperConfigElementDescription(
+    path = "/input/[]/log4j",
+    type = "boolean",
+    description = "Use Log4j serialized objects (e.g.: SocketAppender)",
+    examples = {"true"},
+    defaultValue = "false"
+  )
+  @Expose
+  @SerializedName("log4j")
+  private Boolean log4j;
+
+  @Override
+  public Integer getPort() {
+    return this.port;
+  }
+
+  @Override
+  public String getProtocol() {
+    return this.protocol;
+  }
+
+  @Override
+  public Boolean isSecure() {
+    return this.secure;
+  }
+
+  @Override
+  public Boolean isLog4j() {
+    return this.log4j;
+  }
+
+  public void setPort(Integer port) {
+    this.port = port;
+  }
+
+  public void setProtocol(String protocol) {
+    this.protocol = protocol;
+  }
+
+  public void setSecure(Boolean secure) {
+    this.secure = secure;
+  }
+
+  public void setLog4j(Boolean log4j) {
+    this.log4j = log4j;
+  }
+}
diff --git 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
index 546e4d4..7380c6b 100644
--- 
a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
+++ 
b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigServerZK.java
@@ -26,8 +26,6 @@ import java.util.TreeMap;
 
 import org.apache.ambari.logsearch.config.api.LogLevelFilterManager;
 import org.apache.ambari.logsearch.config.api.LogSearchConfigServer;
-import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
-import 
org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
 import 
org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputAdapter;
 import 
org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigGson;
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
index ed0edcd..1b1fed5 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
@@ -38,11 +38,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-public abstract class Input<PROP_TYPE extends LogFeederProperties, 
INPUT_MARKER extends InputMarker> extends ConfigItem<PROP_TYPE> implements 
Runnable {
+public abstract class Input<PROP_TYPE extends LogFeederProperties, 
INPUT_MARKER extends InputMarker, INPUT_DESC_TYPE extends InputDescriptor> 
extends ConfigItem<PROP_TYPE> implements Runnable {
 
   private static final Logger LOG = LoggerFactory.getLogger(Input.class);
 
-  private InputDescriptor inputDescriptor;
+  private INPUT_DESC_TYPE inputDescriptor;
   private PROP_TYPE logFeederProperties;
   private LogSearchConfigLogFeeder logSearchConfig;
   private InputManager inputManager;
@@ -60,7 +60,7 @@ public abstract class Input<PROP_TYPE extends 
LogFeederProperties, INPUT_MARKER
   private boolean initDefaultFields;
   protected MetricData readBytesMetric = new 
MetricData(getReadBytesMetricName(), false);
 
-  public void loadConfigs(InputDescriptor inputDescriptor, PROP_TYPE 
logFeederProperties,
+  public void loadConfigs(INPUT_DESC_TYPE inputDescriptor, PROP_TYPE 
logFeederProperties,
                           InputManager inputManager, OutputManager 
outputManager) {
     this.inputDescriptor = inputDescriptor;
     this.logFeederProperties = logFeederProperties;
@@ -94,7 +94,7 @@ public abstract class Input<PROP_TYPE extends 
LogFeederProperties, INPUT_MARKER
     return logFeederProperties;
   }
 
-  public InputDescriptor getInputDescriptor() {
+  public INPUT_DESC_TYPE getInputDescriptor() {
     return inputDescriptor;
   }
 
@@ -215,7 +215,7 @@ public abstract class Input<PROP_TYPE extends 
LogFeederProperties, INPUT_MARKER
     }
   }
 
-  public void loadConfig(InputDescriptor inputDescriptor) {
+  public void loadConfig(INPUT_DESC_TYPE inputDescriptor) {
     this.inputDescriptor = inputDescriptor;
   }
 
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 85df2ee..c31f404 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -44,7 +44,7 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.util.*;
 
-public class InputFile extends Input<LogFeederProps, InputFileMarker> {
+public class InputFile extends Input<LogFeederProps, InputFileMarker, 
InputFileBaseDescriptor> {
 
   private static final Logger LOG = LoggerFactory.getLogger(InputFile.class);
 
@@ -223,18 +223,19 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
     super.init(logFeederProps);
     LOG.info("init() called");
 
+    InputFileDescriptor inputFileDescriptor = (InputFileDescriptor) 
getInputDescriptor(); // cast as InputS3 uses InputFileBaseDescriptor
     checkPointExtension = logFeederProps.getCheckPointExtension();
-    checkPointIntervalMS = (int) 
ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)getInputDescriptor()).getCheckpointIntervalMs(),
 DEFAULT_CHECKPOINT_INTERVAL_MS);
-    detachIntervalMin = (int) 
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachIntervalMin(),
 DEFAULT_DETACH_INTERVAL_MIN * 60);
-    detachTimeMin = (int) 
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDetachTimeMin(),
 DEFAULT_DETACH_TIME_MIN * 60);
-    pathUpdateIntervalMin = (int) 
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getPathUpdateIntervalMin(),
 DEFAULT_LOG_PATH_UPDATE_INTERVAL_MIN * 60);
-    maxAgeMin = (int) 
ObjectUtils.defaultIfNull(((InputFileDescriptor)getInputDescriptor()).getMaxAgeMin(),
 0);
-    boolean initDefaultFields = 
BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().isInitDefaultFields(), 
false);
+    checkPointIntervalMS = (int) 
ObjectUtils.defaultIfNull(inputFileDescriptor.getCheckpointIntervalMs(), 
DEFAULT_CHECKPOINT_INTERVAL_MS);
+    detachIntervalMin = (int) 
ObjectUtils.defaultIfNull(inputFileDescriptor.getDetachIntervalMin(), 
DEFAULT_DETACH_INTERVAL_MIN * 60);
+    detachTimeMin = (int) 
ObjectUtils.defaultIfNull(inputFileDescriptor.getDetachTimeMin(), 
DEFAULT_DETACH_TIME_MIN * 60);
+    pathUpdateIntervalMin = (int) 
ObjectUtils.defaultIfNull(inputFileDescriptor.getPathUpdateIntervalMin(), 
DEFAULT_LOG_PATH_UPDATE_INTERVAL_MIN * 60);
+    maxAgeMin = (int) 
ObjectUtils.defaultIfNull(inputFileDescriptor.getMaxAgeMin(), 0);
+    boolean initDefaultFields = 
BooleanUtils.toBooleanDefaultIfNull(inputFileDescriptor.isInitDefaultFields(), 
false);
     setInitDefaultFields(initDefaultFields);
 
     // Let's close the file and set it to true after we start monitoring it
     setClosed(true);
-    dockerLog = 
BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)getInputDescriptor()).getDockerEnabled(),
 false);
+    dockerLog = 
BooleanUtils.toBooleanDefaultIfNull(inputFileDescriptor.getDockerEnabled(), 
false);
     if (dockerLog) {
       if (logFeederProps.isDockerContainerRegistryEnabled()) {
         boolean isFileReady = isReady();
@@ -279,7 +280,7 @@ public class InputFile extends Input<LogFeederProps, 
InputFileMarker> {
 
   @Override
   public void start() throws Exception {
-    boolean isProcessFile = 
BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)getInputDescriptor()).getProcessFile(),
 true);
+    boolean isProcessFile = 
BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().getProcessFile(), 
true);
     if (isProcessFile) {
       for (int i = logFiles.length - 1; i >= 0; i--) {
         File file = logFiles[i];
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java
new file mode 100644
index 0000000..36b4301
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocket.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logsearch.appender.LogsearchConversion;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputSocketDescriptor;
+import org.apache.commons.lang.ObjectUtils;
+import org.apache.log4j.spi.LoggingEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLServerSocketFactory;
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.ObjectInputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+
+public class InputSocket extends Input<LogFeederProps, InputSocketMarker, 
InputSocketDescriptor> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(InputSocket.class);
+
+  private ServerSocket serverSocket;
+  private Thread thread;
+  private int port;
+  private String protocol;
+  private boolean secure;
+  private boolean log4j;
+
+  @Override
+  public void init(LogFeederProps logFeederProperties) throws Exception {
+    super.init(logFeederProperties);
+    port = (int) ObjectUtils.defaultIfNull(getInputDescriptor().getPort(), 0);
+    if (port == 0) {
+      throw new IllegalArgumentException(String.format("Port needs to be set 
for socket input (type: %s)", getInputDescriptor().getType()));
+    }
+
+    protocol = (String) 
ObjectUtils.defaultIfNull(getInputDescriptor().getProtocol(), "tcp");
+    secure = (boolean) 
ObjectUtils.defaultIfNull(getInputDescriptor().isSecure(), false);
+    log4j = (boolean) 
ObjectUtils.defaultIfNull(getInputDescriptor().isLog4j(), false);
+  }
+
+  @Override
+  public boolean monitor() {
+    if (isReady()) {
+      LOG.info("Start monitoring socket thread...");
+      thread = new Thread(this, getNameForThread());
+      thread.start();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting socket server (port: {}, protocol: {}, secure: {})", 
port, protocol, secure);
+    ServerSocketFactory socketFactory = secure ? 
SSLServerSocketFactory.getDefault() : ServerSocketFactory.getDefault();
+    InputSocketMarker inputSocketMarker = new InputSocketMarker(this, port, 
protocol, secure, log4j);
+    LogsearchConversion loggerConverter = new LogsearchConversion();
+
+    try {
+      serverSocket = socketFactory.createServerSocket(port);
+      while (!isDrain()) {
+        Socket socket = serverSocket.accept();
+        if (log4j) {
+          try (ObjectInputStream ois = new ObjectInputStream(new 
BufferedInputStream(socket.getInputStream()))) {
+            LoggingEvent loggingEvent = (LoggingEvent) ois.readObject();
+            String jsonStr = loggerConverter.createOutput(loggingEvent);
+            LOG.trace("Incoming socket logging event: " + jsonStr);
+            outputLine(jsonStr, inputSocketMarker);
+          }
+        } else {
+          try (BufferedReader in = new BufferedReader(new 
InputStreamReader(socket.getInputStream()));) {
+            String line = in.readLine();
+            LOG.trace("Incoming socket message: " + line);
+            outputLine(line, inputSocketMarker);
+          }
+        }
+      }
+    } catch (SocketException socketEx) {
+      LOG.warn("{}", socketEx.getMessage());
+    } finally {
+      serverSocket.close();
+    }
+  }
+
+  @Override
+  public void setDrain(boolean drain) {
+    super.setDrain(drain);
+    LOG.info("Stopping socket input: {}", getShortDescription());
+    try {
+      serverSocket.close();
+      setClosed(true);
+    } catch (Exception e) {
+      LOG.error("Error during closing socket input.", e);
+    }
+  }
+
+  @Override
+  public String getNameForThread() {
+    return String.format("socket=%s-%s-%s", getLogType(), this.protocol, 
this.port);
+  }
+
+  @Override
+  public String getShortDescription() {
+    return String.format("%s - (port: %d, protocol: %s)", getLogType(), port, 
protocol);
+  }
+
+  @Override
+  public boolean isReady() {
+    return true;
+  }
+
+  @Override
+  public InputSocketMarker getInputMarker() {
+    return null;
+  }
+
+  @Override
+  public void setReady(boolean isReady) {
+  }
+
+  @Override
+  public void checkIn(InputSocketMarker inputMarker) {
+  }
+
+  @Override
+  public void lastCheckIn() {
+  }
+
+  @Override
+  public String getReadBytesMetricName() {
+    return null;
+  }
+
+  @Override
+  public String getStatMetricName() {
+    return null;
+  }
+
+  @Override
+  public boolean logConfigs() {
+    return false;
+  }
+}
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocketMarker.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocketMarker.java
new file mode 100644
index 0000000..983cd19
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSocketMarker.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input;
+
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class InputSocketMarker implements InputMarker<InputSocket>{
+  private final InputSocket input;
+  private final Integer port;
+  private final String protocol;
+  private final Boolean secure;
+  private final Boolean log4j;
+
+  private final Map<String, Object> properties = new HashMap<>();
+
+  public InputSocketMarker(InputSocket input, Integer port, String protocol, 
Boolean secure, Boolean log4j) {
+    this.input = input;
+    this.port = port;
+    this.protocol = protocol;
+    this.secure = secure;
+    this.log4j = log4j;
+    properties.put("port", port);
+    properties.put("secure", secure);
+    properties.put("protocol", protocol);
+    properties.put("log4j", log4j);
+  }
+
+  public InputSocket getInput() {
+    return input;
+  }
+
+  @Override
+  public Map<String, Object> getAllProperties() {
+    return this.properties;
+  }
+
+  public Integer getPort() {
+    return port;
+  }
+
+  public String getProtocol() {
+    return protocol;
+  }
+
+  public Boolean isSecure() {
+    return secure;
+  }
+
+  public Boolean isLog4j() {
+    return log4j;
+  }
+}
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index 278a7f5..350986e 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -428,8 +428,10 @@ public class OutputSolr extends Output<LogFeederProps, 
InputMarker> {
               Level.ERROR);
         }
       }
-      
latestInputMarkers.put(outputData.inputMarker.getAllProperties().get("file_key").toString(),
-        outputData.inputMarker);
+      Object fileKey = 
outputData.inputMarker.getAllProperties().get("file_key");
+      if (fileKey != null) {
+        latestInputMarkers.put(fileKey.toString(), outputData.inputMarker);
+      }
       localBuffer.add(document);
     }
 
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
index 3da5b0a..229a9b6 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
@@ -1,5 +1,5 @@
 {
-"input": {
+  "input": {
     "file": {
       "klass": "org.apache.ambari.logfeeder.input.InputFile"
     },
@@ -8,6 +8,9 @@
     },
     "simulate": {
       "klass": "org.apache.ambari.logfeeder.input.InputSimulate"
+    },
+    "socket": {
+      "klass": "org.apache.ambari.logfeeder.input.InputSocket"
     }
   },
   "filter": {
diff --git 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log-samples/shipper-conf/input.config-sample.json
 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log-samples/shipper-conf/input.config-sample.json
index 3951e4b..690bb29 100644
--- 
a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log-samples/shipper-conf/input.config-sample.json
+++ 
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/log-samples/shipper-conf/input.config-sample.json
@@ -5,6 +5,14 @@
       "rowtype": "service",
       "path": "target/classes/log-samples/logs/service_sample.txt",
       "group": "Ambari"
+    },
+    {
+      "type": "service_socket",
+      "rowtype": "service",
+      "port": 61999,
+      "protocol" : "tcp",
+      "source" : "socket",
+      "log4j": "true"
     }
   ],
   "filter": [
@@ -27,6 +35,16 @@
           }
         }
       }
+    },
+    {
+      "filter": "json",
+      "conditions": {
+        "fields": {
+          "type": [
+            "service_socket"
+          ]
+        }
+      }
     }
   ]
 }
diff --git 
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputConfig.java
 
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputConfig.java
index 81c4593..1c4939f 100644
--- 
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputConfig.java
+++ 
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputConfig.java
@@ -38,6 +38,7 @@ import 
com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputSocketDescriptor;
 
 @ApiModel
 public class LSServerInputConfig {
@@ -64,6 +65,9 @@ public class LSServerInputConfig {
       } else if (inputDescriptor instanceof InputS3FileDescriptor) {
         LSServerInput inputItem = new LSServerInputS3File(inputDescriptor);
         input.add(inputItem);
+      } else if (inputDescriptor instanceof InputSocketDescriptor) {
+        LSServerInput inputItem = new LSServerInputSocket(inputDescriptor);
+        input.add(inputItem);
       }
     }
     
diff --git 
a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputSocket.java
 
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputSocket.java
new file mode 100644
index 0000000..efe0e3b
--- /dev/null
+++ 
b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/model/common/LSServerInputSocket.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logsearch.model.common;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import 
org.apache.ambari.logsearch.config.api.model.inputconfig.InputSocketDescriptor;
+
+@ApiModel
+public class LSServerInputSocket extends LSServerInput {
+
+  @JsonProperty("port")
+  private Integer port;
+
+  @JsonProperty("protocol")
+  private String protocol;
+
+  @JsonProperty("secure")
+  private Boolean secure;
+
+  @JsonProperty("log4j")
+  private Boolean log4j;
+
+  public LSServerInputSocket(InputDescriptor inputDescriptor) {
+    super(inputDescriptor);
+    InputSocketDescriptor inputSocketDescriptor = (InputSocketDescriptor) 
inputDescriptor;
+    this.port = inputSocketDescriptor.getPort();
+    this.protocol = inputSocketDescriptor.getProtocol();
+    this.secure = inputSocketDescriptor.isSecure();
+    this.log4j = inputSocketDescriptor.isLog4j();
+  }
+
+  public Integer getPort() {
+    return port;
+  }
+
+  public void setPort(Integer port) {
+    this.port = port;
+  }
+
+  public String getProtocol() {
+    return protocol;
+  }
+
+  public void setProtocol(String protocol) {
+    this.protocol = protocol;
+  }
+
+  public Boolean getSecure() {
+    return secure;
+  }
+
+  public void setSecure(Boolean secure) {
+    this.secure = secure;
+  }
+
+  public Boolean getLog4j() {
+    return log4j;
+  }
+
+  public void setLog4j(Boolean log4j) {
+    this.log4j = log4j;
+  }
+}

Reply via email to