Repository: tajo
Updated Branches:
  refs/heads/branch-0.11.0 f0e22b76a -> 20c7e184c


http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java 
b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index a4a916a..f7aff30 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -33,7 +33,6 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.physical.EvalExprExec;
@@ -63,11 +62,14 @@ import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.session.Session;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.tuple.memory.MemoryBlock;
+import org.apache.tajo.tuple.memory.MemoryRowBlock;
 import org.apache.tajo.util.ProtoUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -217,21 +219,27 @@ public class QueryExecutor {
 
     Schema schema = new Schema();
     schema.addColumn("explain", TajoDataTypes.Type.TEXT);
-    RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
 
     SerializedResultSet.Builder serializedResBuilder = 
SerializedResultSet.newBuilder();
-
-    VTuple tuple = new VTuple(1);
+    MemoryRowBlock rowBlock = new 
MemoryRowBlock(SchemaUtil.toDataTypes(schema));
     String[] lines = explainStr.split("\n");
-    int bytesNum = 0;
-    for (String line : lines) {
-      tuple.put(0, DatumFactory.createText(line));
-      byte [] encodedData = encoder.toBytes(tuple);
-      bytesNum += encodedData.length;
-      
serializedResBuilder.addSerializedTuples(ByteString.copyFrom(encodedData));
+    try {
+      for (String line : lines) {
+        rowBlock.getWriter().startRow();
+        rowBlock.getWriter().putText(line);
+        rowBlock.getWriter().endRow();
+      }
+      MemoryBlock memoryBlock = rowBlock.getMemory();
+      ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, 
memoryBlock.readableBytes());
+      int uncompressedLength = uncompressed.remaining();
+
+      serializedResBuilder.setDecompressedLength(uncompressedLength);
+      
serializedResBuilder.setSerializedTuples(ByteString.copyFrom(uncompressed));
+      serializedResBuilder.setSchema(schema.getProto());
+      serializedResBuilder.setRows(rowBlock.rows());
+    } finally {
+      rowBlock.release();
     }
-    serializedResBuilder.setSchema(schema.getProto());
-    serializedResBuilder.setBytesNum(bytesNum);
 
     QueryInfo queryInfo = 
context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query,
         (LogicalRootNode) plan.getRootBlock().getRoot());
@@ -328,12 +336,23 @@ public class QueryExecutor {
         insertRowValues(queryContext, insertNode, responseBuilder);
       } else {
         Schema schema = PlannerUtil.targetToSchema(targets);
-        RowStoreUtil.RowStoreEncoder encoder = 
RowStoreUtil.createEncoder(schema);
-        byte[] serializedBytes = encoder.toBytes(outTuple);
         SerializedResultSet.Builder serializedResBuilder = 
SerializedResultSet.newBuilder();
-        
serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes));
-        serializedResBuilder.setSchema(schema.getProto());
-        serializedResBuilder.setBytesNum(serializedBytes.length);
+        MemoryRowBlock rowBlock = new 
MemoryRowBlock(SchemaUtil.toDataTypes(schema));
+
+        try {
+          rowBlock.getWriter().addTuple(outTuple);
+
+          MemoryBlock memoryBlock = rowBlock.getMemory();
+          ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, 
memoryBlock.readableBytes());
+          int uncompressedLength = uncompressed.remaining();
+
+          serializedResBuilder.setDecompressedLength(uncompressedLength);
+          
serializedResBuilder.setSerializedTuples(ByteString.copyFrom(uncompressed));
+          serializedResBuilder.setSchema(schema.getProto());
+          serializedResBuilder.setRows(rowBlock.rows());
+        } finally {
+          rowBlock.release();
+        }
 
         QueryInfo queryInfo = 
context.getQueryJobManager().createNewSimpleQuery(queryContext, session, query,
             (LogicalRootNode) plan.getRootBlock().getRoot());

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 1f98457..68916e4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -772,8 +772,8 @@ public class Stage implements EventHandler<StageEvent> {
     // if store plan (i.e., CREATE or INSERT OVERWRITE)
     String storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
     if (storeType == null) {
-      // get default or store type
-      storeType = "TEXT";
+      // get final output store type (i.e., SELECT)
+      storeType = channel.getStoreType();
     }
 
     schema = channel.getSchema();

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java 
b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
index 85dbdbe..9e35b3b 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
@@ -18,28 +18,25 @@
 
 package org.apache.tajo.jdbc;
 
-import com.google.protobuf.ServiceException;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.client.CatalogAdminClient;
 import org.apache.tajo.client.QueryClient;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
-import org.apache.tajo.client.v2.exception.ClientConnectionException;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.error.Errors;
+import org.apache.tajo.exception.SQLExceptionUtil;
+import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.error.Errors.ResultCode;
 import org.apache.tajo.exception.*;
 import org.apache.tajo.jdbc.util.QueryStringDecoder;
 import org.apache.tajo.rpc.RpcUtils;
 import org.apache.tajo.util.KeyValueSet;
 
-import java.io.IOException;
-import java.net.ConnectException;
 import java.net.URI;
 import java.sql.*;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -106,6 +103,7 @@ public class JdbcConnection implements Connection {
       }
 
       params = new QueryStringDecoder(rawURI).getParameters();
+      properties.putAll(params);
     } catch (SQLException se) {
       throw se;
     } catch (Throwable t) { // for unexpected exceptions like 
ArrayIndexOutOfBoundsException.
@@ -115,7 +113,11 @@ public class JdbcConnection implements Connection {
     clientProperties = new KeyValueSet();
     if(properties != null) {
       for(Map.Entry<Object, Object> entry: properties.entrySet()) {
-        clientProperties.set(entry.getKey().toString(), 
entry.getValue().toString());
+        if(entry.getValue() instanceof Collection) {
+          clientProperties.set(entry.getKey().toString(), 
StringUtils.join((Collection) entry.getValue(), ","));
+        } else {
+          clientProperties.set(entry.getKey().toString(), 
entry.getValue().toString());
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java 
b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
index 0c83fd0..4c926bb 100644
--- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
+++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
@@ -21,23 +21,24 @@
  */
 package org.apache.tajo.jdbc;
 
-import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TpchTestBase;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.TajoProtos.CodecType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.tuple.memory.MemoryBlock;
+import org.apache.tajo.tuple.memory.MemoryRowBlock;
+import org.apache.tajo.util.CompressionUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -45,9 +46,9 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.sql.*;
 import java.util.Calendar;
-import java.util.List;
 import java.util.TimeZone;
 
 import static org.junit.Assert.*;
@@ -60,7 +61,7 @@ public class TestResultSet {
   private static FileTablespace sm;
   private static TableMeta scoreMeta;
   private static Schema scoreSchema;
-  private static List<ByteString> serializedData;
+  private static MemoryRowBlock rowBlock;
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -72,13 +73,12 @@ public class TestResultSet {
     scoreSchema.addColumn("deptname", Type.TEXT);
     scoreSchema.addColumn("score", Type.INT4);
     scoreMeta = CatalogUtil.newTableMeta("TEXT");
+    rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(scoreSchema));
     TableStats stats = new TableStats();
 
     Path p = new Path(sm.getTableUri("default", "score"));
     sm.getFileSystem().mkdirs(p);
     Appender appender = sm.getAppender(scoreMeta, scoreSchema, new Path(p, 
"score"));
-    RowStoreUtil.RowStoreEncoder encoder = 
RowStoreUtil.createEncoder(scoreSchema);
-    serializedData = Lists.newArrayList();
     appender.init();
 
     int deptSize = 100;
@@ -92,7 +92,7 @@ public class TestResultSet {
       tuple.put(1, DatumFactory.createInt4(i + 1));
       written += key.length() + Integer.SIZE;
       appender.addTuple(tuple);
-      serializedData.add(ByteString.copyFrom(encoder.toBytes(tuple)));
+      rowBlock.getWriter().addTuple(tuple);
     }
     appender.close();
     stats.setNumRows(tupleNum);
@@ -103,18 +103,67 @@ public class TestResultSet {
     desc = new 
TableDesc(CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "score"),
         scoreSchema, scoreMeta, p.toUri());
     desc.setStats(stats);
+    assertEquals(tupleNum, rowBlock.rows());
   }
 
   @AfterClass
   public static void terminate() throws IOException {
-
+    rowBlock.release();
   }
 
   @Test
   public void testMemoryResultSet() throws Exception {
-    TajoMemoryResultSet rs = new TajoMemoryResultSet(null, desc.getSchema(),
-        serializedData, desc.getStats().getNumRows().intValue(), null);
+    SerializedResultSet.Builder resultSetBuilder = 
SerializedResultSet.newBuilder();
+    resultSetBuilder.setSchema(scoreSchema.getProto());
+    resultSetBuilder.setRows(rowBlock.rows());
+
+    MemoryBlock memoryBlock = rowBlock.getMemory();
+    ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, 
memoryBlock.readableBytes());
+    resultSetBuilder.setSerializedTuples(ByteString.copyFrom(uncompressed));
+    resultSetBuilder.setDecompressedLength(memoryBlock.readableBytes());
+
+    TajoMemoryResultSet rs = new TajoMemoryResultSet(null, desc.getSchema(), 
resultSetBuilder.build(), null);
+
+    ResultSetMetaData meta = rs.getMetaData();
+    assertNotNull(meta);
+    Schema schema = scoreSchema;
+    assertEquals(schema.size(), meta.getColumnCount());
+    for (int i = 0; i < meta.getColumnCount(); i++) {
+      assertEquals(schema.getColumn(i).getSimpleName(), meta.getColumnName(i + 
1));
+      assertEquals(schema.getColumn(i).getQualifier(), meta.getTableName(i + 
1));
+    }
+
+    int i = 0;
+    assertTrue(rs.isBeforeFirst());
+    for (; rs.next(); i++) {
+      assertEquals("test"+i%100, rs.getString(1));
+      assertEquals("test"+i%100, rs.getString("deptname"));
+      assertEquals(i+1, rs.getInt(2));
+      assertEquals(i+1, rs.getInt("score"));
+    }
+    assertEquals(10000, i);
+    assertTrue(rs.isAfterLast());
+  }
+
+
+  @Test
+  public void testCompressedMemoryResultSet() throws Exception {
+    SerializedResultSet.Builder resultSetBuilder = 
SerializedResultSet.newBuilder();
+    resultSetBuilder.setSchema(scoreSchema.getProto());
+    resultSetBuilder.setRows(rowBlock.rows());
+
+    MemoryBlock memoryBlock = rowBlock.getMemory();
+    byte[] uncompressedBytes = new byte[memoryBlock.readableBytes()];
+    memoryBlock.getBuffer().getBytes(0, uncompressedBytes);
+
+    // compress by snappy
+    byte[] compressedBytes = CompressionUtil.compress(CodecType.SNAPPY, 
uncompressedBytes);
+    resultSetBuilder.setDecompressedLength(uncompressedBytes.length);
+    resultSetBuilder.setDecompressCodec(CodecType.SNAPPY);
+    resultSetBuilder.setSerializedTuples(ByteString.copyFrom(compressedBytes));
+
 
+    TajoMemoryResultSet rs = new TajoMemoryResultSet(null, desc.getSchema(), 
resultSetBuilder.build(), null);
     ResultSetMetaData meta = rs.getMetaData();
     assertNotNull(meta);
     Schema schema = scoreSchema;

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java 
b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
index 40d7e58..a860d51 100644
--- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
+++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
@@ -194,6 +194,91 @@ public class TestTajoJdbc extends QueryTestCaseBase {
   }
 
   @Test
+  public void testResultSetCompression() throws Exception {
+    String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), 
tajoMasterAddress.getPort(),
+        TajoConstants.DEFAULT_DATABASE_NAME);
+    connUri = connUri + "?" + SessionVars.COMPRESSED_RESULT_TRANSFER.keyname() 
+ "=true";
+    Connection conn = DriverManager.getConnection(connUri);
+    assertTrue(conn.isValid(100));
+
+    PreparedStatement stmt = null;
+    ResultSet res = null;
+    try {
+      /*
+      test data set
+      1,17.0,N
+      1,36.0,N
+      2,38.0,N
+      3,45.0,R
+      3,49.0,R
+      */
+
+      String sql =
+          "select l_orderkey, l_quantity, l_returnflag from lineitem where 
l_quantity > ? and l_returnflag = ?";
+
+      stmt = conn.prepareStatement(sql);
+
+      stmt.setInt(1, 20);
+      stmt.setString(2, "N");
+
+      res = stmt.executeQuery();
+
+      ResultSetMetaData rsmd = res.getMetaData();
+      assertEquals(3, rsmd.getColumnCount());
+      assertEquals("l_orderkey", rsmd.getColumnName(1));
+      assertEquals("l_quantity", rsmd.getColumnName(2));
+      assertEquals("l_returnflag", rsmd.getColumnName(3));
+
+      try {
+        int numRows = 0;
+        String[] resultData = {"136.0N", "238.0N"};
+        while (res.next()) {
+          assertEquals(resultData[numRows],
+              ("" + res.getObject(1).toString() + res.getObject(2).toString() 
+ res.getObject(3).toString()));
+          numRows++;
+        }
+        assertEquals(2, numRows);
+      } finally {
+        res.close();
+      }
+
+      stmt.setInt(1, 20);
+      stmt.setString(2, "R");
+
+      res = stmt.executeQuery();
+
+      rsmd = res.getMetaData();
+      assertEquals(3, rsmd.getColumnCount());
+      assertEquals("l_orderkey", rsmd.getColumnName(1));
+      assertEquals("l_quantity", rsmd.getColumnName(2));
+      assertEquals("l_returnflag", rsmd.getColumnName(3));
+
+      try {
+        int numRows = 0;
+        String[] resultData = {"345.0R", "349.0R"};
+        while (res.next()) {
+          assertEquals(resultData[numRows],
+              ("" + res.getObject(1).toString() + res.getObject(2).toString() 
+ res.getObject(3).toString()));
+          numRows++;
+        }
+        assertEquals(2, numRows);
+      } finally {
+        res.close();
+      }
+    } finally {
+      if (res != null) {
+        res.close();
+      }
+      if (stmt != null) {
+        stmt.close();
+      }
+      if (conn != null) {
+        conn.close();
+      }
+    }
+  }
+
+  @Test
   public void testDatabaseMetaDataGetTable() throws Exception {
     String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), 
tajoMasterAddress.getPort(),
       TajoConstants.DEFAULT_DATABASE_NAME);
@@ -584,9 +669,8 @@ public class TestTajoJdbc extends QueryTestCaseBase {
     try {
       assertTrue("should have result set", statement.execute());
       TajoResultSetBase result = (TajoResultSetBase) statement.getResultSet();
-      Thread.sleep(1000);   // todo query master is not killed properly if 
it's compiling the query (use 100, if you want see)
       statement.cancel();
-
+      Thread.sleep(1000);
       QueryStatus status = client.getQueryStatus(result.getQueryId());
       assertEquals(TajoProtos.QueryState.QUERY_KILLED, status.getState());
     } finally {

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PersistentStoreNode.java
----------------------------------------------------------------------
diff --git 
a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PersistentStoreNode.java 
b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PersistentStoreNode.java
index c892eda..a28f01f 100644
--- 
a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PersistentStoreNode.java
+++ 
b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PersistentStoreNode.java
@@ -21,6 +21,7 @@ package org.apache.tajo.plan.logical;
 
 import com.google.gson.annotations.Expose;
 
+import org.apache.tajo.BuiltinStorages;
 import org.apache.tajo.plan.PlanString;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
@@ -31,7 +32,7 @@ import org.apache.tajo.util.TUtil;
  * This includes some basic information for materializing data.
  */
 public abstract class PersistentStoreNode extends UnaryNode implements 
Cloneable {
-  @Expose protected String storageType = "TEXT";
+  @Expose protected String storageType = BuiltinStorages.TEXT;
   @Expose protected KeyValueSet options = new KeyValueSet();
 
   protected PersistentStoreNode(int pid, NodeType nodeType) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index b6a13af..903976f 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -1153,6 +1153,11 @@
         <artifactId>json-path</artifactId>
         <version>2.0.0</version>
       </dependency>
+      <dependency>
+        <groupId>org.iq80.snappy</groupId>
+        <artifactId>snappy</artifactId>
+        <version>0.4</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
   <profiles>

http://git-wip-us.apache.org/repos/asf/tajo/blob/20c7e184/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index aa7ba67..598c8e8 100644
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -183,11 +183,13 @@ public class RpcClientManager {
   public static void close() {
     LOG.info("Closing RPC client manager");
 
-    for (NettyClientBase eachClient : clients.values()) {
-      try {
-        eachClient.close();
-      } catch (Exception e) {
-        LOG.error(e.getMessage(), e);
+    synchronized (clients) {
+      for (NettyClientBase eachClient : clients.values()) {
+        try {
+          eachClient.close();
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        }
       }
     }
   }

Reply via email to