http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/MapStateTest.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/MapStateTest.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/MapStateTest.java index 3c10dd9..b47165b 100644 --- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/MapStateTest.java +++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/MapStateTest.java @@ -1,21 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.trident; import com.datastax.driver.core.Cluster; @@ -25,6 +19,8 @@ import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Truncate; import com.datastax.driver.core.schemabuilder.Create; import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import java.util.HashMap; +import java.util.Map; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.storm.Config; import org.apache.storm.LocalCluster; @@ -43,14 +39,13 @@ import org.apache.storm.trident.testing.FixedBatchSpout; import org.apache.storm.trident.testing.Split; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.junit.*; - -import java.util.HashMap; -import java.util.Map; - import static org.junit.Assert.assertEquals; public class MapStateTest { @@ -62,13 +57,20 @@ public class MapStateTest { private static Cluster cluster; private Session session; + protected static Column column(String name, DataType type) { + Column column = new Column(); + column.name = name; + column.type = type; + return column; + } + @Test public void nonTransactionalStateTest() throws Exception { StateFactory factory = MapStateFactoryBuilder.nontransactional(getCassandraConfig()) - .withTable("words_ks", "words_table") - .withKeys("word") - .withJSONBinaryState("state") - .build(); + .withTable("words_ks", "words_table") + .withKeys("word") + .withJSONBinaryState("state") + .build(); wordsTest(factory); } @@ -78,10 +80,10 @@ public class MapStateTest { Map<String, Object> config = new HashMap(); StateFactory factory = MapStateFactoryBuilder.transactional(getCassandraConfig()) - .withTable("words_ks", "words_table") - .withKeys("word") - .withJSONBinaryState("state") - .build(); + .withTable("words_ks", "words_table") + .withKeys("word") + .withJSONBinaryState("state") + .build(); wordsTest(factory); } @@ -91,10 +93,10 @@ public class MapStateTest { Map<String, Object> config = new HashMap(); StateFactory factory = MapStateFactoryBuilder.opaque(getCassandraConfig()) - .withTable("words_ks", "words_table") - .withKeys("word") - .withJSONBinaryState("state") - .build(); + .withTable("words_ks", "words_table") + .withKeys("word") + .withJSONBinaryState("state") + .build(); wordsTest(factory); } @@ -102,20 +104,20 @@ public class MapStateTest { public void wordsTest(StateFactory factory) throws Exception { FixedBatchSpout spout = new FixedBatchSpout( - new Fields("sentence"), 3, - new Values("the cow jumped over the moon"), - new Values("the man went to the store and bought some candy"), - new Values("four score and seven years ago"), - new Values("how many apples can you eat")); + new Fields("sentence"), 3, + new Values("the cow jumped over the moon"), + new Values("the man went to the store and bought some candy"), + new Values("four score and seven years ago"), + new Values("how many apples can you eat")); spout.setCycle(false); TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) - .each(new Fields("sentence"), new Split(), new Fields("word")) - .groupBy(new Fields("word")) - .persistentAggregate(factory, new Count(), new Fields("state")) - .parallelismHint(1); + .each(new Fields("sentence"), new Split(), new Fields("word")) + .groupBy(new Fields("word")) + .persistentAggregate(factory, new Count(), new Fields("state")) + .parallelismHint(1); LocalDRPC client = new LocalDRPC(); topology.newDRPCStream("words", client) @@ -134,7 +136,7 @@ public class MapStateTest { do { Thread.sleep(2000); count = session.execute(QueryBuilder.select().all().from("words_ks", "words_table")) - .getAvailableWithoutFetching(); + .getAvailableWithoutFetching(); logger.info("Found {} records", count); } while (count < 24); @@ -164,8 +166,8 @@ public class MapStateTest { createKeyspace("words_ks"); createTable("words_ks", "words_table", - column("word", DataType.varchar()), - column("state", DataType.blob())); + column("word", DataType.varchar()), + column("state", DataType.blob())); } @@ -178,11 +180,11 @@ public class MapStateTest { protected void createKeyspace(String keyspace) throws Exception { // Create keyspace not supported in the current datastax driver String createKeyspace = "CREATE KEYSPACE IF NOT EXISTS " - + keyspace - + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"; + + keyspace + + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"; logger.info(createKeyspace); if (!session.execute(createKeyspace) - .wasApplied()) { + .wasApplied()) { throw new Exception("Did not create keyspace " + keyspace); } } @@ -206,8 +208,8 @@ public class MapStateTest { replication.put("replication_factor", 1); Create createTable = SchemaBuilder.createTable(keyspace, table) - .ifNotExists() - .addPartitionKey(key.name, key.type); + .ifNotExists() + .addPartitionKey(key.name, key.type); for (Column field : fields) { createTable.addColumn(field.name, field.type); } @@ -215,13 +217,6 @@ public class MapStateTest { session.execute(createTable); } - protected static Column column(String name, DataType type) { - Column column = new Column(); - column.name = name; - column.type = type; - return column; - } - protected static class Column { public String name; public DataType type;
http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java index 6aaed56..736b6a8 100644 --- a/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java +++ b/external/storm-cassandra/src/test/java/org/apache/storm/cassandra/trident/WeatherBatchSpout.java @@ -1,37 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.cassandra.trident; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Fields; -import com.google.common.collect.Lists; -import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.spout.IBatchSpout; +package org.apache.storm.cassandra.trident; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.spout.IBatchSpout; +import org.apache.storm.tuple.Fields; /** * @@ -57,24 +47,25 @@ public class WeatherBatchSpout implements IBatchSpout { @Override public void emitBatch(long batchId, TridentCollector collector) { List<List<Object>> batch = this.batches.get(batchId); - if(batch == null){ + if (batch == null) { batch = new ArrayList<>(); - for (int i=0; i< batchSize; i++) { + for (int i = 0; i < batchSize; i++) { batch.add(createTuple()); } this.batches.put(batchId, batch); } - for(List<Object> list : batch){ + for (List<Object> list : batch) { collector.emit(list); } } private List<Object> createTuple() { final Random random = new Random(); - List<Object> values = new ArrayList<Object>(){{ + List<Object> values = new ArrayList<Object>() {{ add(stationIds[random.nextInt(stationIds.length)]); add(random.nextInt(100) + ""); - add(UUID.randomUUID());}}; + add(UUID.randomUUID()); + }}; return values; }
