Repository: storm
Updated Branches:
  refs/heads/master 5bdf3dd67 -> e7fdd67a6


http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
 
b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
index 9c365ae..e9986df 100644
--- 
a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
+++ 
b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -22,18 +22,15 @@ package org.apache.storm.sql.compiler.backends.trident;
 import com.google.common.collect.ImmutableMap;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.avatica.util.DateTimeUtils;
-import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.sql.SqlTestUtil;
 import org.apache.storm.sql.TestUtils;
-import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.planner.trident.QueryPlanner;
 import org.apache.storm.sql.runtime.ISqlTridentDataSource;
 import org.apache.storm.sql.AbstractTridentProcessor;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -43,10 +40,11 @@ import org.junit.Test;
 import java.time.ZoneOffset;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 
 import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues;
+import static org.junit.Assert.assertEquals;
 
 public class TestPlanCompiler {
   private static LocalCluster cluster;
@@ -82,7 +80,7 @@ public class TestPlanCompiler {
     Fields f = proc.outputStream().getOutputFields();
     proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(),
             f, new TestUtils.MockStateUpdater(), new Fields());
-    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    SqlTestUtil.runTridentTopology(cluster, EXPECTED_VALUE_SIZE, proc, topo);
     Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, 
getCollectedValues().toArray());
   }
 
@@ -98,7 +96,7 @@ public class TestPlanCompiler {
     QueryPlanner planner = new QueryPlanner(state.schema());
     AbstractTridentProcessor proc = planner.compile(data, sql);
     final TridentTopology topo = proc.build();
-    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    SqlTestUtil.runTridentTopology(cluster, EXPECTED_VALUE_SIZE, proc, topo);
     Assert.assertArrayEquals(new Values[] { new Values(4, "abcde", "y")}, 
getCollectedValues().toArray());
   }
 
@@ -118,58 +116,86 @@ public class TestPlanCompiler {
     Fields f = proc.outputStream().getOutputFields();
     proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(),
             f, new TestUtils.MockStateUpdater(), new Fields());
-    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    SqlTestUtil.runTridentTopology(cluster, EXPECTED_VALUE_SIZE, proc, topo);
     Assert.assertArrayEquals(new Values[] { new Values(5) }, 
getCollectedValues().toArray());
   }
 
   @Test
-  public void testCaseStatement() throws Exception {
-    int EXPECTED_VALUE_SIZE = 5;
-    String sql = "SELECT CASE WHEN NAME IN ('a', 'abc', 'abcde') THEN 
UPPER('a') " +
-            "WHEN UPPER(NAME) = 'AB' THEN 'b' ELSE {fn CONCAT(NAME, '#')} END 
FROM FOO";
-    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
+  public void testNested() throws Exception {
+    int EXPECTED_VALUE_SIZE = 1;
+    String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
+            "FROM FOO " +
+            "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
+    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverNestedTable(sql);
 
     final Map<String, ISqlTridentDataSource> data = new HashMap<>();
-    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    data.put("FOO", new TestUtils.MockSqlTridentNestedDataSource());
 
     QueryPlanner planner = new QueryPlanner(state.schema());
     AbstractTridentProcessor proc = planner.compile(data, sql);
     final TridentTopology topo = proc.build();
     Fields f = proc.outputStream().getOutputFields();
     proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, 
new TestUtils.MockStateUpdater(), new Fields());
-    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    SqlTestUtil.runTridentTopology(cluster, EXPECTED_VALUE_SIZE, proc, topo);
 
-    Assert.assertArrayEquals(new Values[]{new Values("A"), new Values("b"), 
new Values("A"), new Values("abcd#"), new Values("A")}, 
getCollectedValues().toArray());
+    Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
+    Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
+    Assert.assertArrayEquals(new Values[]{new Values(2, 4, nestedMap, 
Arrays.asList(100, 200, 300))}, getCollectedValues().toArray());
   }
 
+  /**
+   * All the binary literal tests are done here, because Avatica converts the 
result to byte[]
+   * whereas Trident provides the result to ByteString which makes different 
semantic from Trident implementation.
+   */
   @Test
-  public void testNested() throws Exception {
+  public void testBinaryStringFunctions() throws Exception {
     int EXPECTED_VALUE_SIZE = 1;
-    String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " +
+    String sql = "SELECT x'45F0AB' || x'45F0AB', " +
+            "POSITION(x'F0' IN x'453423F0ABBC'), " +
+            "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3), " +
+            "SUBSTRING(x'453423F0ABBC' FROM 3), " +
+            "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4) " +
             "FROM FOO " +
-            "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200";
-    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverNestedTable(sql);
+            "WHERE ID > 0 AND ID < 2";
 
-    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
-    data.put("FOO", new TestUtils.MockSqlTridentNestedDataSource());
+    TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
 
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
     QueryPlanner planner = new QueryPlanner(state.schema());
     AbstractTridentProcessor proc = planner.compile(data, sql);
     final TridentTopology topo = proc.build();
     Fields f = proc.outputStream().getOutputFields();
     proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, 
new TestUtils.MockStateUpdater(), new Fields());
-    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    SqlTestUtil.runTridentTopology(cluster, EXPECTED_VALUE_SIZE, proc, topo);
 
-    Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4);
-    Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map);
-    Assert.assertArrayEquals(new Values[]{new Values(2, 4, nestedMap, 
Arrays.asList(100, 200, 300))}, getCollectedValues().toArray());
+    List<Object> v = getCollectedValues().get(0);
+
+    assertEquals("45f0ab45f0ab", v.get(0).toString());
+    assertEquals(4, v.get(1));
+    assertEquals("45344534abbc45", v.get(2).toString());
+    assertEquals("23f0abbc", v.get(3).toString());
+    assertEquals("23f0abbc", v.get(4).toString());
   }
 
+  /**
+   * All the date/time/timestamp related tests are done here, because Avatica 
converts the result of date functions to java.sql classes
+   * whereas Trident provides long type which makes different semantic from 
Trident implementation.
+   */
   @Test
-  public void testDateKeywords() throws Exception {
+  public void testDateKeywordsAndFunctions() throws Exception {
     int EXPECTED_VALUE_SIZE = 1;
     String sql = "SELECT " +
-            "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, 
CURRENT_DATE " +
+            "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, 
CURRENT_DATE, " +
+            "DATE '1970-05-15' AS datefield, TIME '00:00:00' AS timefield, 
TIMESTAMP '2016-01-01 00:00:00' as timestampfield, " +
+            "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56')," +
+            "FLOOR(DATE '2016-01-23' TO MONTH)," +
+            "CEIL(TIME '12:34:56' TO MINUTE)," +
+            "{fn CURDATE()} = CURRENT_DATE, {fn CURTIME()} = LOCALTIME, {fn 
NOW()} = LOCALTIMESTAMP," +
+            "{fn QUARTER(DATE '2016-10-07')}, {fn TIMESTAMPADD(MINUTE, 15, 
TIMESTAMP '2016-10-07 00:00:00')}," +
+            "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 00:00:00', 
TIMESTAMP '2016-10-07 00:00:00')}," +
+            "INTERVAL '1-5' YEAR TO MONTH AS intervalfield, " +
+            "(DATE '1970-01-01', DATE '1970-01-15') AS anchoredinterval_field 
"   +
             "FROM FOO " +
             "WHERE ID > 0 AND ID < 2";
     TestCompilerUtils.CalciteState state = 
TestCompilerUtils.sqlOverDummyTable(sql);
@@ -182,7 +208,7 @@ public class TestPlanCompiler {
     final TridentTopology topo = proc.build();
     Fields f = proc.outputStream().getOutputFields();
     proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, 
new TestUtils.MockStateUpdater(), new Fields());
-    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    SqlTestUtil.runTridentTopology(cluster, EXPECTED_VALUE_SIZE, proc, topo);
 
     long utcTimestamp = (long) 
dataContext.get(DataContext.Variable.UTC_TIMESTAMP.camelName);
     long currentTimestamp = (long) 
dataContext.get(DataContext.Variable.CURRENT_TIMESTAMP.camelName);
@@ -195,38 +221,9 @@ public class TestPlanCompiler {
     int localTimeInt = (int) (localTimestamp % DateTimeUtils.MILLIS_PER_DAY);
     int currentTimeInt = (int) (currentTimestamp % 
DateTimeUtils.MILLIS_PER_DAY);
 
-    Assert.assertArrayEquals(new Values[]{new Values(localTimeInt, 
currentTimeInt, localTimestamp, currentTimestamp, dateInt)}, 
getCollectedValues().toArray());
+    Assert.assertArrayEquals(new Values[]{new Values(localTimeInt, 
currentTimeInt, localTimestamp, currentTimestamp, dateInt,
+                    134, 0, 1451606400000L, 1L, 0L, 45300000, true, true, 
true, 4L, 1475799300000L, 86400, 17, 0, 14)},
+            getCollectedValues().toArray());
   }
 
-  private void runTridentTopology(final int expectedValueSize, 
AbstractTridentProcessor proc,
-                                  TridentTopology topo) throws Exception {
-    final Config conf = new Config();
-    conf.setMaxSpoutPending(20);
-
-    if (proc.getClassLoaders() != null && proc.getClassLoaders().size() > 0) {
-      CompilingClassLoader lastClassloader = 
proc.getClassLoaders().get(proc.getClassLoaders().size() - 1);
-      Utils.setClassLoaderForJavaDeSerialize(lastClassloader);
-    }
-
-    try (LocalTopology stormTopo = cluster.submitTopology("storm-sql", conf, 
topo.build())) {
-      waitForCompletion(1000 * 1000, new Callable<Boolean>() {
-        @Override
-        public Boolean call() throws Exception {
-          return getCollectedValues().size() < expectedValueSize;
-        }
-      });
-    } finally {
-      while(cluster.getClusterInfo().get_topologies_size() > 0) {
-        Thread.sleep(10);
-      }
-      Utils.resetClassLoaderForJavaDeSerialize();
-    }
-  }
-
-  private void waitForCompletion(long timeout, Callable<Boolean> cond) throws 
Exception {
-    long start = TestUtils.monotonicNow();
-    while (TestUtils.monotonicNow() - start < timeout && cond.call()) {
-      Thread.sleep(100);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
 
b/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
index f918c29..70ede5b 100644
--- 
a/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
+++ 
b/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java
@@ -33,7 +33,6 @@ import 
org.apache.storm.hdfs.trident.format.SimpleFileNameFormat;
 import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
 import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy;
 import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
-import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.DataSourcesProvider;
 import org.apache.storm.sql.runtime.FieldInfo;
 import org.apache.storm.sql.runtime.IOutputSerializer;
@@ -121,12 +120,6 @@ public class HdfsDataSourcesProvider implements 
DataSourcesProvider {
     }
 
     @Override
-    public DataSource construct(URI uri, String inputFormatClass, String 
outputFormatClass,
-                                List<FieldInfo> fields) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
                                                   Properties properties, 
List<FieldInfo> fields) {
         List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
 
b/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
index b0e6530..81b94b1 100644
--- 
a/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
+++ 
b/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
@@ -37,7 +37,6 @@ import 
org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
 import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
 import org.apache.storm.spout.Scheme;
 import org.apache.storm.spout.SchemeAsMultiScheme;
-import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.DataSourcesProvider;
 import org.apache.storm.sql.runtime.FieldInfo;
 import org.apache.storm.sql.runtime.IOutputSerializer;
@@ -124,12 +123,6 @@ public class KafkaDataSourcesProvider implements 
DataSourcesProvider {
     }
 
     @Override
-    public DataSource construct(URI uri, String inputFormatClass, String 
outputFormatClass,
-                                List<FieldInfo> fields) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
                                                   Properties properties, 
List<FieldInfo> fields) {
         int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
 
b/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
index f3682b8..e82f9b6 100644
--- 
a/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
+++ 
b/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
@@ -28,7 +28,6 @@ import org.apache.storm.mongodb.common.mapper.MongoMapper;
 import org.apache.storm.mongodb.trident.state.MongoState;
 import org.apache.storm.mongodb.trident.state.MongoStateFactory;
 import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
-import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.DataSourcesProvider;
 import org.apache.storm.sql.runtime.FieldInfo;
 import org.apache.storm.sql.runtime.IOutputSerializer;
@@ -112,12 +111,6 @@ public class MongoDataSourcesProvider implements 
DataSourcesProvider {
     }
 
     @Override
-    public DataSource construct(URI uri, String inputFormatClass, String 
outputFormatClass,
-                                List<FieldInfo> fields) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
                                                   Properties properties, 
List<FieldInfo> fields) {
         List<String> fieldNames = FieldInfoUtils.getFieldNames(fields);

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
 
b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
index b020a15..8dbdd74 100644
--- 
a/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
+++ 
b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java
@@ -35,7 +35,6 @@ import org.apache.storm.redis.trident.state.RedisClusterState;
 import org.apache.storm.redis.trident.state.RedisClusterStateUpdater;
 import org.apache.storm.redis.trident.state.RedisState;
 import org.apache.storm.redis.trident.state.RedisStateUpdater;
-import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.DataSourcesProvider;
 import org.apache.storm.sql.runtime.FieldInfo;
 import org.apache.storm.sql.runtime.IOutputSerializer;
@@ -148,11 +147,6 @@ public class RedisDataSourcesProvider implements 
DataSourcesProvider {
     }
 
     @Override
-    public DataSource construct(URI uri, String inputFormatClass, String 
outputFormatClass, List<FieldInfo> fields) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public ISqlTridentDataSource constructTrident(
             URI uri, String inputFormatClass, String outputFormatClass, 
Properties props, List<FieldInfo> fields) {
         Preconditions.checkArgument(JedisURIHelper.isValid(uri), "URI is not 
valid for Redis: " + uri);

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
 
b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
deleted file mode 100644
index effdf55..0000000
--- 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-
-public abstract class AbstractChannelHandler implements ChannelHandler {
-    @Override
-    public abstract void dataReceived(ChannelContext ctx, Values data);
-
-    @Override
-    public void channelInactive(ChannelContext ctx) {
-
-    }
-
-    @Override
-    public void exceptionCaught(Throwable cause) {
-
-    }
-
-    @Override
-    public void flush(ChannelContext ctx) {
-        ctx.flush();
-    }
-
-    @Override
-    public void setSource(ChannelContext ctx, Object source) {
-
-    }
-
-    public static final AbstractChannelHandler PASS_THROUGH = new 
AbstractChannelHandler() {
-        @Override
-        public void dataReceived(ChannelContext ctx, Values data) {
-            ctx.emit(data);
-        }
-    };
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
 
b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
deleted file mode 100644
index 1449325..0000000
--- 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime;
-
-import java.util.Map;
-
-import org.apache.storm.tuple.Values;
-
-/**
- * Subclass of AbstractTupleProcessor provides a series of tuple. It
- * takes a series of iterators of {@link Values} and produces a stream of
- * tuple.
- * <p/>
- * The subclass implements the {@see next()} method to provide
- * the output of the stream. It can choose to return null in {@see next()} to
- * indicate that this particular iteration is a no-op. SQL processors depend
- * on this semantic to implement filtering and nullable records.
- */
-public abstract class AbstractValuesProcessor {
-
-    /**
-     * Initialize the data sources.
-     *
-     * @param data a map from the table name to the iterators of the values.
-     *
-     */
-    public abstract void initialize(Map<String, DataSource> data, 
ChannelHandler
-            result);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
 
b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
deleted file mode 100644
index 9d6f662..0000000
--- 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-
-public interface ChannelContext {
-    /**
-     * Emit data to the next stage of the data pipeline.
-     */
-    void emit(Values data);
-
-    void fireChannelInactive();
-
-    void flush();
-
-    void setSource(Object source);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
 
b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
deleted file mode 100644
index 3009df4..0000000
--- 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-
-/**
- * DataListener provides an event-driven interface for the user to process
- * series of events.
- */
-public interface ChannelHandler {
-    void dataReceived(ChannelContext ctx, Values data);
-
-    /**
-     * The producer of the data has indicated that the channel is no longer
-     * active.
-     * @param ctx ChannelContext
-     */
-    void channelInactive(ChannelContext ctx);
-
-    void exceptionCaught(Throwable cause);
-
-    void flush(ChannelContext ctx);
-
-    void setSource(ChannelContext ctx, Object source);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java 
b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
deleted file mode 100644
index d389c38..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime;
-
-import org.apache.storm.tuple.Values;
-
-public class Channels {
-    private static final ChannelContext VOID_CTX = new ChannelContext() {
-        @Override
-        public void emit(Values data) {}
-
-        @Override
-        public void fireChannelInactive() {}
-
-        @Override
-        public void flush() {
-
-        }
-
-        @Override
-        public void setSource(java.lang.Object source) {
-
-        }
-    };
-
-    private static class ChannelContextAdapter implements ChannelContext {
-        private final ChannelHandler handler;
-        private final ChannelContext next;
-
-        public ChannelContextAdapter(
-                ChannelContext next, ChannelHandler handler) {
-            this.handler = handler;
-            this.next = next;
-        }
-
-        @Override
-        public void emit(Values data) {
-            handler.dataReceived(next, data);
-        }
-
-        @Override
-        public void fireChannelInactive() {
-            handler.channelInactive(next);
-        }
-
-        @Override
-        public void flush() {
-            handler.flush(next);
-        }
-
-        @Override
-        public void setSource(java.lang.Object source) {
-            handler.setSource(next, source);
-            next.setSource(source); // propagate through the chain
-        }
-    }
-
-    private static class ForwardingChannelContext implements ChannelContext {
-        private final ChannelContext next;
-
-        public ForwardingChannelContext(ChannelContext next) {
-            this.next = next;
-        }
-
-        @Override
-        public void emit(Values data) {
-            next.emit(data);
-        }
-
-        @Override
-        public void fireChannelInactive() {
-            next.fireChannelInactive();
-        }
-
-        @Override
-        public void flush() {
-            next.flush();
-        }
-
-        @Override
-        public void setSource(Object source) {
-            next.setSource(source);
-        }
-    }
-
-    public static ChannelContext chain(
-            ChannelContext next, ChannelHandler handler) {
-        return new ChannelContextAdapter(next, handler);
-    }
-
-    public static ChannelContext voidContext() {
-        return VOID_CTX;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java 
b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
deleted file mode 100644
index b90b4f5..0000000
--- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime;
-
-/**
- * A DataSource ingests data in StormSQL. It provides a series of tuple to
- * the downstream {@link ChannelHandler}.
- *
- */
-public interface DataSource {
-    void open(ChannelContext ctx);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
 
b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
index baec6cd..d89e815 100644
--- 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
+++ 
b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
@@ -29,20 +29,6 @@ public interface DataSourcesProvider {
      */
     String scheme();
 
-    /**
-     * Construct a new data source.
-     * @param uri The URI that specifies the data source. The format of the URI
-     *            is fully customizable.
-     * @param inputFormatClass the name of the class that deserializes data.
-     *                         It is null when unspecified.
-     * @param outputFormatClass the name of the class that serializes data. It
-     *                          is null when unspecified.
-     * @param fields The name of the fields and the schema of the table.
-     */
-    DataSource construct(
-            URI uri, String inputFormatClass, String outputFormatClass,
-            List<FieldInfo> fields);
-
     ISqlTridentDataSource constructTrident(
             URI uri, String inputFormatClass, String outputFormatClass,
             Properties properties, List<FieldInfo> fields);

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
 
b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
index 9d8368a..7383235 100644
--- 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
+++ 
b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
@@ -47,25 +47,6 @@ public class DataSourcesRegistry {
     }
 
     /**
-     * Construct a data source.
-     * @param uri data source uri
-     * @param inputFormatClass input format class
-     * @param outputFormatClass output format class
-     * @param fields fields info list
-     * @return DataSource object
-     */
-    public static DataSource construct(
-            URI uri, String inputFormatClass, String outputFormatClass,
-            List<FieldInfo> fields) {
-        DataSourcesProvider provider = providers.get(uri.getScheme());
-        if (provider == null) {
-            return null;
-        }
-
-        return provider.construct(uri, inputFormatClass, outputFormatClass, 
fields);
-    }
-
-    /**
      * Construct a trident data source.
      * @param uri data source uri
      * @param inputFormatClass input format class

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
 
b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
deleted file mode 100644
index 095e6ba..0000000
--- 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.sql.runtime;
-
-public class StormSqlFunctions {
-
-    /**
-     * Whether the object equals the other one.
-     * @param b0 one object
-     * @param b1 the other object
-     * @return true if the object equals the other one
-     */
-    public static Boolean eq(Object b0, Object b1) {
-        if (b0 == null || b1 == null) {
-            return null;
-        }
-        return b0.equals(b1);
-    }
-
-    /**
-     * Whether the object dose not equals the other one.
-     * @param b0 one object
-     * @param b1 the other object
-     * @return true if the object dose not equals the other one
-     */
-    public static Boolean ne(Object b0, Object b1) {
-        if (b0 == null || b1 == null) {
-            return null;
-        }
-        return !b0.equals(b1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
 
b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
index fe4d024..10e470d 100644
--- 
a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
+++ 
b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Properties;
 
 import org.apache.storm.spout.Scheme;
-import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.DataSourcesProvider;
 import org.apache.storm.sql.runtime.FieldInfo;
 import org.apache.storm.sql.runtime.IOutputSerializer;
@@ -79,11 +78,6 @@ public class SocketDataSourcesProvider implements 
DataSourcesProvider {
     }
 
     @Override
-    public DataSource construct(URI uri, String inputFormatClass, String 
outputFormatClass, List<FieldInfo> fields) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public ISqlTridentDataSource constructTrident(URI uri, String 
inputFormatClass, String outputFormatClass,
                                                   Properties properties, 
List<FieldInfo> fields) {
         String host = uri.getHost();

http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java 
b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
index 05587ba..4c945fa 100644
--- a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
+++ b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -19,9 +19,6 @@
  */
 package org.apache.storm.sql;
 
-import org.apache.storm.sql.runtime.ChannelContext;
-import org.apache.storm.sql.runtime.ChannelHandler;
-import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.ISqlTridentDataSource;
 import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer;
 import org.apache.storm.task.IMetricsContext;
@@ -63,7 +60,6 @@ public class TestUtils {
     }
   }
 
-
   public static class TopN {
     public static PriorityQueue<Integer> init() {
       return new PriorityQueue<>();
@@ -89,107 +85,6 @@ public class TestUtils {
     }
   }
 
-
-  public static class MockDataSource implements DataSource {
-    private final ArrayList<Values> RECORDS = new ArrayList<>();
-
-    public MockDataSource() {
-      for (int i = 0; i < 5; ++i) {
-        RECORDS.add(new Values(i, "x", null));
-      }
-    }
-
-    @Override
-    public void open(ChannelContext ctx) {
-      for (Values v : RECORDS) {
-        ctx.emit(v);
-      }
-      ctx.fireChannelInactive();
-    }
-  }
-
-  public static class MockGroupDataSource implements DataSource {
-    private final ArrayList<Values> RECORDS = new ArrayList<>();
-
-    public MockGroupDataSource() {
-      for (int i = 0; i < 10; ++i) {
-        RECORDS.add(new Values(i/3, i, (i+1)* 0.5, "x", i/2));
-      }
-    }
-
-    @Override
-    public void open(ChannelContext ctx) {
-      for (Values v : RECORDS) {
-        ctx.emit(v);
-      }
-      // force evaluation of the aggregate function on the last group
-      ctx.flush();
-      ctx.fireChannelInactive();
-    }
-  }
-
-  public static class MockEmpDataSource implements DataSource {
-    private final ArrayList<Values> RECORDS = new ArrayList<>();
-
-    public MockEmpDataSource() {
-      RECORDS.add(new Values(1, "emp1", 1));
-      RECORDS.add(new Values(2, "emp2", 1));
-      RECORDS.add(new Values(3, "emp3", 2));
-    }
-
-    @Override
-    public void open(ChannelContext ctx) {
-      for (Values v : RECORDS) {
-        ctx.emit(v);
-      }
-      ctx.flush();
-      ctx.fireChannelInactive();
-    }
-  }
-
-  public static class MockDeptDataSource implements DataSource {
-    private final ArrayList<Values> RECORDS = new ArrayList<>();
-
-    public MockDeptDataSource() {
-      RECORDS.add(new Values(1, "dept1"));
-      RECORDS.add(new Values(2, "dept2"));
-      RECORDS.add(new Values(3, "dept3"));
-    }
-
-    @Override
-    public void open(ChannelContext ctx) {
-      for (Values v : RECORDS) {
-        ctx.emit(v);
-      }
-      ctx.flush();
-      ctx.fireChannelInactive();
-    }
-  }
-
-  public static class MockNestedDataSource implements DataSource {
-    private final ArrayList<Values> RECORDS = new ArrayList<>();
-
-    public MockNestedDataSource() {
-      List<Integer> ints = Arrays.asList(100, 200, 300);
-      for (int i = 0; i < 5; ++i) {
-        Map<String, Integer> map = new HashMap<>();
-        map.put("b", i);
-        map.put("c", i*i);
-        Map<String, Map<String, Integer>> mm = new HashMap<>();
-        mm.put("a", map);
-        RECORDS.add(new Values(i, map, mm, ints));
-      }
-    }
-
-    @Override
-    public void open(ChannelContext ctx) {
-      for (Values v : RECORDS) {
-        ctx.emit(v);
-      }
-      ctx.fireChannelInactive();
-    }
-  }
-
   public static class MockState implements State {
     /**
      * Collect all values in a static variable as the instance will go through 
serialization and deserialization.
@@ -244,6 +139,65 @@ public class TestUtils {
     }
   }
 
+  public static class MockSqlExprDataSource implements ISqlTridentDataSource {
+    @Override
+    public IBatchSpout getProducer() {
+      return new MockSqlExprDataSource.MockSpout();
+    }
+
+    @Override
+    public SqlTridentConsumer getConsumer() {
+      return new SimpleSqlTridentConsumer(new MockStateFactory(), new 
MockStateUpdater());
+    }
+
+    private static class MockSpout implements IBatchSpout {
+      private final ArrayList<Values> RECORDS = new ArrayList<>();
+      private final Fields OUTPUT_FIELDS = new Fields("ID", "NAME", "ADDR");
+
+      public MockSpout() {
+        for (int i = 0; i < 5; ++i) {
+          RECORDS.add(new Values(i, "x", null));
+        }
+      }
+
+      private boolean emitted = false;
+
+      @Override
+      public void open(Map<String, Object> conf, TopologyContext context) {
+      }
+
+      @Override
+      public void emitBatch(long batchId, TridentCollector collector) {
+        if (emitted) {
+          return;
+        }
+
+        for (Values r : RECORDS) {
+          collector.emit(r);
+        }
+        emitted = true;
+      }
+
+      @Override
+      public void ack(long batchId) {
+      }
+
+      @Override
+      public void close() {
+      }
+
+      @Override
+      public Map<String, Object> getComponentConfiguration() {
+        return null;
+      }
+
+      @Override
+      public Fields getOutputFields() {
+        return OUTPUT_FIELDS;
+      }
+    }
+  }
+
   public static class MockSqlTridentDataSource implements 
ISqlTridentDataSource {
     @Override
     public IBatchSpout getProducer() {
@@ -550,32 +504,6 @@ public class TestUtils {
     }
   }
 
-  public static class CollectDataChannelHandler implements ChannelHandler {
-    private final List<Values> values;
-
-    public CollectDataChannelHandler(List<Values> values) {
-      this.values = values;
-    }
-
-    @Override
-    public void dataReceived(ChannelContext ctx, Values data) {
-      values.add(data);
-    }
-
-    @Override
-    public void channelInactive(ChannelContext ctx) {}
-
-    @Override
-    public void exceptionCaught(Throwable cause) {
-      throw new RuntimeException(cause);
-    }
-
-    @Override
-    public void flush(ChannelContext ctx) {}
-
-    @Override
-    public void setSource(ChannelContext ctx, Object source) {}
-  }
 
   public static long monotonicNow() {
     final long NANOSECONDS_PER_MILLISECOND = 1000000;

Reply via email to