Repository: storm Updated Branches: refs/heads/master 5bdf3dd67 -> e7fdd67a6
http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java index 9c365ae..e9986df 100644 --- a/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java +++ b/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java @@ -22,18 +22,15 @@ package org.apache.storm.sql.compiler.backends.trident; import com.google.common.collect.ImmutableMap; import org.apache.calcite.DataContext; import org.apache.calcite.avatica.util.DateTimeUtils; -import org.apache.storm.Config; import org.apache.storm.LocalCluster; -import org.apache.storm.LocalCluster.LocalTopology; +import org.apache.storm.sql.SqlTestUtil; import org.apache.storm.sql.TestUtils; -import org.apache.storm.sql.javac.CompilingClassLoader; import org.apache.storm.sql.planner.trident.QueryPlanner; import org.apache.storm.sql.runtime.ISqlTridentDataSource; import org.apache.storm.sql.AbstractTridentProcessor; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -43,10 +40,11 @@ import org.junit.Test; import java.time.ZoneOffset; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import static org.apache.storm.sql.TestUtils.MockState.getCollectedValues; +import static org.junit.Assert.assertEquals; public class TestPlanCompiler { private static LocalCluster cluster; @@ -82,7 +80,7 @@ public class TestPlanCompiler { Fields f = proc.outputStream().getOutputFields(); proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields()); - runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + SqlTestUtil.runTridentTopology(cluster, EXPECTED_VALUE_SIZE, proc, topo); Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, getCollectedValues().toArray()); } @@ -98,7 +96,7 @@ public class TestPlanCompiler { QueryPlanner planner = new QueryPlanner(state.schema()); AbstractTridentProcessor proc = planner.compile(data, sql); final TridentTopology topo = proc.build(); - runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + SqlTestUtil.runTridentTopology(cluster, EXPECTED_VALUE_SIZE, proc, topo); Assert.assertArrayEquals(new Values[] { new Values(4, "abcde", "y")}, getCollectedValues().toArray()); } @@ -118,58 +116,86 @@ public class TestPlanCompiler { Fields f = proc.outputStream().getOutputFields(); proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields()); - runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + SqlTestUtil.runTridentTopology(cluster, EXPECTED_VALUE_SIZE, proc, topo); Assert.assertArrayEquals(new Values[] { new Values(5) }, getCollectedValues().toArray()); } @Test - public void testCaseStatement() throws Exception { - int EXPECTED_VALUE_SIZE = 5; - String sql = "SELECT CASE WHEN NAME IN ('a', 'abc', 'abcde') THEN UPPER('a') " + - "WHEN UPPER(NAME) = 'AB' THEN 'b' ELSE {fn CONCAT(NAME, '#')} END FROM FOO"; - TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); + public void testNested() throws Exception { + int EXPECTED_VALUE_SIZE = 1; + String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " + + "FROM FOO " + + "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200"; + TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql); final Map<String, ISqlTridentDataSource> data = new HashMap<>(); - data.put("FOO", new TestUtils.MockSqlTridentDataSource()); + data.put("FOO", new TestUtils.MockSqlTridentNestedDataSource()); QueryPlanner planner = new QueryPlanner(state.schema()); AbstractTridentProcessor proc = planner.compile(data, sql); final TridentTopology topo = proc.build(); Fields f = proc.outputStream().getOutputFields(); proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields()); - runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + SqlTestUtil.runTridentTopology(cluster, EXPECTED_VALUE_SIZE, proc, topo); - Assert.assertArrayEquals(new Values[]{new Values("A"), new Values("b"), new Values("A"), new Values("abcd#"), new Values("A")}, getCollectedValues().toArray()); + Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4); + Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map); + Assert.assertArrayEquals(new Values[]{new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300))}, getCollectedValues().toArray()); } + /** + * All the binary literal tests are done here, because Avatica converts the result to byte[] + * whereas Trident provides the result to ByteString which makes different semantic from Trident implementation. + */ @Test - public void testNested() throws Exception { + public void testBinaryStringFunctions() throws Exception { int EXPECTED_VALUE_SIZE = 1; - String sql = "SELECT ID, MAPFIELD['c'], NESTEDMAPFIELD, ARRAYFIELD " + + String sql = "SELECT x'45F0AB' || x'45F0AB', " + + "POSITION(x'F0' IN x'453423F0ABBC'), " + + "OVERLAY(x'453423F0ABBC45' PLACING x'4534' FROM 3), " + + "SUBSTRING(x'453423F0ABBC' FROM 3), " + + "SUBSTRING(x'453423F0ABBC453423F0ABBC' FROM 3 FOR 4) " + "FROM FOO " + - "WHERE NESTEDMAPFIELD['a']['b'] = 2 AND ARRAYFIELD[2] = 200"; - TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverNestedTable(sql); + "WHERE ID > 0 AND ID < 2"; - final Map<String, ISqlTridentDataSource> data = new HashMap<>(); - data.put("FOO", new TestUtils.MockSqlTridentNestedDataSource()); + TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); + final Map<String, ISqlTridentDataSource> data = new HashMap<>(); + data.put("FOO", new TestUtils.MockSqlTridentDataSource()); QueryPlanner planner = new QueryPlanner(state.schema()); AbstractTridentProcessor proc = planner.compile(data, sql); final TridentTopology topo = proc.build(); Fields f = proc.outputStream().getOutputFields(); proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields()); - runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + SqlTestUtil.runTridentTopology(cluster, EXPECTED_VALUE_SIZE, proc, topo); - Map<String, Integer> map = ImmutableMap.of("b", 2, "c", 4); - Map<String, Map<String, Integer>> nestedMap = ImmutableMap.of("a", map); - Assert.assertArrayEquals(new Values[]{new Values(2, 4, nestedMap, Arrays.asList(100, 200, 300))}, getCollectedValues().toArray()); + List<Object> v = getCollectedValues().get(0); + + assertEquals("45f0ab45f0ab", v.get(0).toString()); + assertEquals(4, v.get(1)); + assertEquals("45344534abbc45", v.get(2).toString()); + assertEquals("23f0abbc", v.get(3).toString()); + assertEquals("23f0abbc", v.get(4).toString()); } + /** + * All the date/time/timestamp related tests are done here, because Avatica converts the result of date functions to java.sql classes + * whereas Trident provides long type which makes different semantic from Trident implementation. + */ @Test - public void testDateKeywords() throws Exception { + public void testDateKeywordsAndFunctions() throws Exception { int EXPECTED_VALUE_SIZE = 1; String sql = "SELECT " + - "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, CURRENT_DATE " + + "LOCALTIME, CURRENT_TIME, LOCALTIMESTAMP, CURRENT_TIMESTAMP, CURRENT_DATE, " + + "DATE '1970-05-15' AS datefield, TIME '00:00:00' AS timefield, TIMESTAMP '2016-01-01 00:00:00' as timestampfield, " + + "EXTRACT(MONTH FROM TIMESTAMP '2010-01-23 12:34:56')," + + "FLOOR(DATE '2016-01-23' TO MONTH)," + + "CEIL(TIME '12:34:56' TO MINUTE)," + + "{fn CURDATE()} = CURRENT_DATE, {fn CURTIME()} = LOCALTIME, {fn NOW()} = LOCALTIMESTAMP," + + "{fn QUARTER(DATE '2016-10-07')}, {fn TIMESTAMPADD(MINUTE, 15, TIMESTAMP '2016-10-07 00:00:00')}," + + "{fn TIMESTAMPDIFF(SECOND, TIMESTAMP '2016-10-06 00:00:00', TIMESTAMP '2016-10-07 00:00:00')}," + + "INTERVAL '1-5' YEAR TO MONTH AS intervalfield, " + + "(DATE '1970-01-01', DATE '1970-01-15') AS anchoredinterval_field " + "FROM FOO " + "WHERE ID > 0 AND ID < 2"; TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql); @@ -182,7 +208,7 @@ public class TestPlanCompiler { final TridentTopology topo = proc.build(); Fields f = proc.outputStream().getOutputFields(); proc.outputStream().partitionPersist(new TestUtils.MockStateFactory(), f, new TestUtils.MockStateUpdater(), new Fields()); - runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo); + SqlTestUtil.runTridentTopology(cluster, EXPECTED_VALUE_SIZE, proc, topo); long utcTimestamp = (long) dataContext.get(DataContext.Variable.UTC_TIMESTAMP.camelName); long currentTimestamp = (long) dataContext.get(DataContext.Variable.CURRENT_TIMESTAMP.camelName); @@ -195,38 +221,9 @@ public class TestPlanCompiler { int localTimeInt = (int) (localTimestamp % DateTimeUtils.MILLIS_PER_DAY); int currentTimeInt = (int) (currentTimestamp % DateTimeUtils.MILLIS_PER_DAY); - Assert.assertArrayEquals(new Values[]{new Values(localTimeInt, currentTimeInt, localTimestamp, currentTimestamp, dateInt)}, getCollectedValues().toArray()); + Assert.assertArrayEquals(new Values[]{new Values(localTimeInt, currentTimeInt, localTimestamp, currentTimestamp, dateInt, + 134, 0, 1451606400000L, 1L, 0L, 45300000, true, true, true, 4L, 1475799300000L, 86400, 17, 0, 14)}, + getCollectedValues().toArray()); } - private void runTridentTopology(final int expectedValueSize, AbstractTridentProcessor proc, - TridentTopology topo) throws Exception { - final Config conf = new Config(); - conf.setMaxSpoutPending(20); - - if (proc.getClassLoaders() != null && proc.getClassLoaders().size() > 0) { - CompilingClassLoader lastClassloader = proc.getClassLoaders().get(proc.getClassLoaders().size() - 1); - Utils.setClassLoaderForJavaDeSerialize(lastClassloader); - } - - try (LocalTopology stormTopo = cluster.submitTopology("storm-sql", conf, topo.build())) { - waitForCompletion(1000 * 1000, new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - return getCollectedValues().size() < expectedValueSize; - } - }); - } finally { - while(cluster.getClusterInfo().get_topologies_size() > 0) { - Thread.sleep(10); - } - Utils.resetClassLoaderForJavaDeSerialize(); - } - } - - private void waitForCompletion(long timeout, Callable<Boolean> cond) throws Exception { - long start = TestUtils.monotonicNow(); - while (TestUtils.monotonicNow() - start < timeout && cond.call()) { - Thread.sleep(100); - } - } } http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java index f918c29..70ede5b 100644 --- a/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java +++ b/sql/storm-sql-external/storm-sql-hdfs/src/jvm/org/apache/storm/sql/hdfs/HdfsDataSourcesProvider.java @@ -33,7 +33,6 @@ import org.apache.storm.hdfs.trident.format.SimpleFileNameFormat; import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy; import org.apache.storm.hdfs.trident.rotation.FileSizeRotationPolicy; import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy; -import org.apache.storm.sql.runtime.DataSource; import org.apache.storm.sql.runtime.DataSourcesProvider; import org.apache.storm.sql.runtime.FieldInfo; import org.apache.storm.sql.runtime.IOutputSerializer; @@ -121,12 +120,6 @@ public class HdfsDataSourcesProvider implements DataSourcesProvider { } @Override - public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, - List<FieldInfo> fields) { - throw new UnsupportedOperationException(); - } - - @Override public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties properties, List<FieldInfo> fields) { List<String> fieldNames = FieldInfoUtils.getFieldNames(fields); http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java index b0e6530..81b94b1 100644 --- a/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java +++ b/sql/storm-sql-external/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java @@ -37,7 +37,6 @@ import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; import org.apache.storm.kafka.trident.selector.DefaultTopicSelector; import org.apache.storm.spout.Scheme; import org.apache.storm.spout.SchemeAsMultiScheme; -import org.apache.storm.sql.runtime.DataSource; import org.apache.storm.sql.runtime.DataSourcesProvider; import org.apache.storm.sql.runtime.FieldInfo; import org.apache.storm.sql.runtime.IOutputSerializer; @@ -124,12 +123,6 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider { } @Override - public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, - List<FieldInfo> fields) { - throw new UnsupportedOperationException(); - } - - @Override public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties properties, List<FieldInfo> fields) { int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT; http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java index f3682b8..e82f9b6 100644 --- a/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java +++ b/sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java @@ -28,7 +28,6 @@ import org.apache.storm.mongodb.common.mapper.MongoMapper; import org.apache.storm.mongodb.trident.state.MongoState; import org.apache.storm.mongodb.trident.state.MongoStateFactory; import org.apache.storm.mongodb.trident.state.MongoStateUpdater; -import org.apache.storm.sql.runtime.DataSource; import org.apache.storm.sql.runtime.DataSourcesProvider; import org.apache.storm.sql.runtime.FieldInfo; import org.apache.storm.sql.runtime.IOutputSerializer; @@ -112,12 +111,6 @@ public class MongoDataSourcesProvider implements DataSourcesProvider { } @Override - public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, - List<FieldInfo> fields) { - throw new UnsupportedOperationException(); - } - - @Override public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties properties, List<FieldInfo> fields) { List<String> fieldNames = FieldInfoUtils.getFieldNames(fields); http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java index b020a15..8dbdd74 100644 --- a/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java +++ b/sql/storm-sql-external/storm-sql-redis/src/jvm/org/apache/storm/sql/redis/RedisDataSourcesProvider.java @@ -35,7 +35,6 @@ import org.apache.storm.redis.trident.state.RedisClusterState; import org.apache.storm.redis.trident.state.RedisClusterStateUpdater; import org.apache.storm.redis.trident.state.RedisState; import org.apache.storm.redis.trident.state.RedisStateUpdater; -import org.apache.storm.sql.runtime.DataSource; import org.apache.storm.sql.runtime.DataSourcesProvider; import org.apache.storm.sql.runtime.FieldInfo; import org.apache.storm.sql.runtime.IOutputSerializer; @@ -148,11 +147,6 @@ public class RedisDataSourcesProvider implements DataSourcesProvider { } @Override - public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, List<FieldInfo> fields) { - throw new UnsupportedOperationException(); - } - - @Override public ISqlTridentDataSource constructTrident( URI uri, String inputFormatClass, String outputFormatClass, Properties props, List<FieldInfo> fields) { Preconditions.checkArgument(JedisURIHelper.isValid(uri), "URI is not valid for Redis: " + uri); http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java deleted file mode 100644 index effdf55..0000000 --- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.sql.runtime; - -import org.apache.storm.tuple.Values; - -public abstract class AbstractChannelHandler implements ChannelHandler { - @Override - public abstract void dataReceived(ChannelContext ctx, Values data); - - @Override - public void channelInactive(ChannelContext ctx) { - - } - - @Override - public void exceptionCaught(Throwable cause) { - - } - - @Override - public void flush(ChannelContext ctx) { - ctx.flush(); - } - - @Override - public void setSource(ChannelContext ctx, Object source) { - - } - - public static final AbstractChannelHandler PASS_THROUGH = new AbstractChannelHandler() { - @Override - public void dataReceived(ChannelContext ctx, Values data) { - ctx.emit(data); - } - }; -} http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java deleted file mode 100644 index 1449325..0000000 --- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.sql.runtime; - -import java.util.Map; - -import org.apache.storm.tuple.Values; - -/** - * Subclass of AbstractTupleProcessor provides a series of tuple. It - * takes a series of iterators of {@link Values} and produces a stream of - * tuple. - * <p/> - * The subclass implements the {@see next()} method to provide - * the output of the stream. It can choose to return null in {@see next()} to - * indicate that this particular iteration is a no-op. SQL processors depend - * on this semantic to implement filtering and nullable records. - */ -public abstract class AbstractValuesProcessor { - - /** - * Initialize the data sources. - * - * @param data a map from the table name to the iterators of the values. - * - */ - public abstract void initialize(Map<String, DataSource> data, ChannelHandler - result); -} http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java deleted file mode 100644 index 9d6f662..0000000 --- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.sql.runtime; - -import org.apache.storm.tuple.Values; - -public interface ChannelContext { - /** - * Emit data to the next stage of the data pipeline. - */ - void emit(Values data); - - void fireChannelInactive(); - - void flush(); - - void setSource(Object source); -} http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java deleted file mode 100644 index 3009df4..0000000 --- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.sql.runtime; - -import org.apache.storm.tuple.Values; - -/** - * DataListener provides an event-driven interface for the user to process - * series of events. - */ -public interface ChannelHandler { - void dataReceived(ChannelContext ctx, Values data); - - /** - * The producer of the data has indicated that the channel is no longer - * active. - * @param ctx ChannelContext - */ - void channelInactive(ChannelContext ctx); - - void exceptionCaught(Throwable cause); - - void flush(ChannelContext ctx); - - void setSource(ChannelContext ctx, Object source); -} http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java deleted file mode 100644 index d389c38..0000000 --- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.sql.runtime; - -import org.apache.storm.tuple.Values; - -public class Channels { - private static final ChannelContext VOID_CTX = new ChannelContext() { - @Override - public void emit(Values data) {} - - @Override - public void fireChannelInactive() {} - - @Override - public void flush() { - - } - - @Override - public void setSource(java.lang.Object source) { - - } - }; - - private static class ChannelContextAdapter implements ChannelContext { - private final ChannelHandler handler; - private final ChannelContext next; - - public ChannelContextAdapter( - ChannelContext next, ChannelHandler handler) { - this.handler = handler; - this.next = next; - } - - @Override - public void emit(Values data) { - handler.dataReceived(next, data); - } - - @Override - public void fireChannelInactive() { - handler.channelInactive(next); - } - - @Override - public void flush() { - handler.flush(next); - } - - @Override - public void setSource(java.lang.Object source) { - handler.setSource(next, source); - next.setSource(source); // propagate through the chain - } - } - - private static class ForwardingChannelContext implements ChannelContext { - private final ChannelContext next; - - public ForwardingChannelContext(ChannelContext next) { - this.next = next; - } - - @Override - public void emit(Values data) { - next.emit(data); - } - - @Override - public void fireChannelInactive() { - next.fireChannelInactive(); - } - - @Override - public void flush() { - next.flush(); - } - - @Override - public void setSource(Object source) { - next.setSource(source); - } - } - - public static ChannelContext chain( - ChannelContext next, ChannelHandler handler) { - return new ChannelContextAdapter(next, handler); - } - - public static ChannelContext voidContext() { - return VOID_CTX; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java deleted file mode 100644 index b90b4f5..0000000 --- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.sql.runtime; - -/** - * A DataSource ingests data in StormSQL. It provides a series of tuple to - * the downstream {@link ChannelHandler}. - * - */ -public interface DataSource { - void open(ChannelContext ctx); -} http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java index baec6cd..d89e815 100644 --- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java +++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java @@ -29,20 +29,6 @@ public interface DataSourcesProvider { */ String scheme(); - /** - * Construct a new data source. - * @param uri The URI that specifies the data source. The format of the URI - * is fully customizable. - * @param inputFormatClass the name of the class that deserializes data. - * It is null when unspecified. - * @param outputFormatClass the name of the class that serializes data. It - * is null when unspecified. - * @param fields The name of the fields and the schema of the table. - */ - DataSource construct( - URI uri, String inputFormatClass, String outputFormatClass, - List<FieldInfo> fields); - ISqlTridentDataSource constructTrident( URI uri, String inputFormatClass, String outputFormatClass, Properties properties, List<FieldInfo> fields); http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java index 9d8368a..7383235 100644 --- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java +++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java @@ -47,25 +47,6 @@ public class DataSourcesRegistry { } /** - * Construct a data source. - * @param uri data source uri - * @param inputFormatClass input format class - * @param outputFormatClass output format class - * @param fields fields info list - * @return DataSource object - */ - public static DataSource construct( - URI uri, String inputFormatClass, String outputFormatClass, - List<FieldInfo> fields) { - DataSourcesProvider provider = providers.get(uri.getScheme()); - if (provider == null) { - return null; - } - - return provider.construct(uri, inputFormatClass, outputFormatClass, fields); - } - - /** * Construct a trident data source. * @param uri data source uri * @param inputFormatClass input format class http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java deleted file mode 100644 index 095e6ba..0000000 --- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.sql.runtime; - -public class StormSqlFunctions { - - /** - * Whether the object equals the other one. - * @param b0 one object - * @param b1 the other object - * @return true if the object equals the other one - */ - public static Boolean eq(Object b0, Object b1) { - if (b0 == null || b1 == null) { - return null; - } - return b0.equals(b1); - } - - /** - * Whether the object dose not equals the other one. - * @param b0 one object - * @param b1 the other object - * @return true if the object dose not equals the other one - */ - public static Boolean ne(Object b0, Object b1) { - if (b0 == null || b1 == null) { - return null; - } - return !b0.equals(b1); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java index fe4d024..10e470d 100644 --- a/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java +++ b/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Properties; import org.apache.storm.spout.Scheme; -import org.apache.storm.sql.runtime.DataSource; import org.apache.storm.sql.runtime.DataSourcesProvider; import org.apache.storm.sql.runtime.FieldInfo; import org.apache.storm.sql.runtime.IOutputSerializer; @@ -79,11 +78,6 @@ public class SocketDataSourcesProvider implements DataSourcesProvider { } @Override - public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass, List<FieldInfo> fields) { - throw new UnsupportedOperationException(); - } - - @Override public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass, Properties properties, List<FieldInfo> fields) { String host = uri.getHost(); http://git-wip-us.apache.org/repos/asf/storm/blob/1dcd12de/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java index 05587ba..4c945fa 100644 --- a/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java +++ b/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java @@ -19,9 +19,6 @@ */ package org.apache.storm.sql; -import org.apache.storm.sql.runtime.ChannelContext; -import org.apache.storm.sql.runtime.ChannelHandler; -import org.apache.storm.sql.runtime.DataSource; import org.apache.storm.sql.runtime.ISqlTridentDataSource; import org.apache.storm.sql.runtime.SimpleSqlTridentConsumer; import org.apache.storm.task.IMetricsContext; @@ -63,7 +60,6 @@ public class TestUtils { } } - public static class TopN { public static PriorityQueue<Integer> init() { return new PriorityQueue<>(); @@ -89,107 +85,6 @@ public class TestUtils { } } - - public static class MockDataSource implements DataSource { - private final ArrayList<Values> RECORDS = new ArrayList<>(); - - public MockDataSource() { - for (int i = 0; i < 5; ++i) { - RECORDS.add(new Values(i, "x", null)); - } - } - - @Override - public void open(ChannelContext ctx) { - for (Values v : RECORDS) { - ctx.emit(v); - } - ctx.fireChannelInactive(); - } - } - - public static class MockGroupDataSource implements DataSource { - private final ArrayList<Values> RECORDS = new ArrayList<>(); - - public MockGroupDataSource() { - for (int i = 0; i < 10; ++i) { - RECORDS.add(new Values(i/3, i, (i+1)* 0.5, "x", i/2)); - } - } - - @Override - public void open(ChannelContext ctx) { - for (Values v : RECORDS) { - ctx.emit(v); - } - // force evaluation of the aggregate function on the last group - ctx.flush(); - ctx.fireChannelInactive(); - } - } - - public static class MockEmpDataSource implements DataSource { - private final ArrayList<Values> RECORDS = new ArrayList<>(); - - public MockEmpDataSource() { - RECORDS.add(new Values(1, "emp1", 1)); - RECORDS.add(new Values(2, "emp2", 1)); - RECORDS.add(new Values(3, "emp3", 2)); - } - - @Override - public void open(ChannelContext ctx) { - for (Values v : RECORDS) { - ctx.emit(v); - } - ctx.flush(); - ctx.fireChannelInactive(); - } - } - - public static class MockDeptDataSource implements DataSource { - private final ArrayList<Values> RECORDS = new ArrayList<>(); - - public MockDeptDataSource() { - RECORDS.add(new Values(1, "dept1")); - RECORDS.add(new Values(2, "dept2")); - RECORDS.add(new Values(3, "dept3")); - } - - @Override - public void open(ChannelContext ctx) { - for (Values v : RECORDS) { - ctx.emit(v); - } - ctx.flush(); - ctx.fireChannelInactive(); - } - } - - public static class MockNestedDataSource implements DataSource { - private final ArrayList<Values> RECORDS = new ArrayList<>(); - - public MockNestedDataSource() { - List<Integer> ints = Arrays.asList(100, 200, 300); - for (int i = 0; i < 5; ++i) { - Map<String, Integer> map = new HashMap<>(); - map.put("b", i); - map.put("c", i*i); - Map<String, Map<String, Integer>> mm = new HashMap<>(); - mm.put("a", map); - RECORDS.add(new Values(i, map, mm, ints)); - } - } - - @Override - public void open(ChannelContext ctx) { - for (Values v : RECORDS) { - ctx.emit(v); - } - ctx.fireChannelInactive(); - } - } - public static class MockState implements State { /** * Collect all values in a static variable as the instance will go through serialization and deserialization. @@ -244,6 +139,65 @@ public class TestUtils { } } + public static class MockSqlExprDataSource implements ISqlTridentDataSource { + @Override + public IBatchSpout getProducer() { + return new MockSqlExprDataSource.MockSpout(); + } + + @Override + public SqlTridentConsumer getConsumer() { + return new SimpleSqlTridentConsumer(new MockStateFactory(), new MockStateUpdater()); + } + + private static class MockSpout implements IBatchSpout { + private final ArrayList<Values> RECORDS = new ArrayList<>(); + private final Fields OUTPUT_FIELDS = new Fields("ID", "NAME", "ADDR"); + + public MockSpout() { + for (int i = 0; i < 5; ++i) { + RECORDS.add(new Values(i, "x", null)); + } + } + + private boolean emitted = false; + + @Override + public void open(Map<String, Object> conf, TopologyContext context) { + } + + @Override + public void emitBatch(long batchId, TridentCollector collector) { + if (emitted) { + return; + } + + for (Values r : RECORDS) { + collector.emit(r); + } + emitted = true; + } + + @Override + public void ack(long batchId) { + } + + @Override + public void close() { + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + + @Override + public Fields getOutputFields() { + return OUTPUT_FIELDS; + } + } + } + public static class MockSqlTridentDataSource implements ISqlTridentDataSource { @Override public IBatchSpout getProducer() { @@ -550,32 +504,6 @@ public class TestUtils { } } - public static class CollectDataChannelHandler implements ChannelHandler { - private final List<Values> values; - - public CollectDataChannelHandler(List<Values> values) { - this.values = values; - } - - @Override - public void dataReceived(ChannelContext ctx, Values data) { - values.add(data); - } - - @Override - public void channelInactive(ChannelContext ctx) {} - - @Override - public void exceptionCaught(Throwable cause) { - throw new RuntimeException(cause); - } - - @Override - public void flush(ChannelContext ctx) {} - - @Override - public void setSource(ChannelContext ctx, Object source) {} - } public static long monotonicNow() { final long NANOSECONDS_PER_MILLISECOND = 1000000;
