Repository: incubator-streams Updated Branches: refs/heads/master 7662e2733 -> 11e3a0f1b
STREAMS-421: Delete defunct or not-implemented runtime modules Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/11e3a0f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/11e3a0f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/11e3a0f1 Branch: refs/heads/master Commit: 11e3a0f1b78e548c7678a91f6793c8568cfca0c9 Parents: 7662e27 Author: smarthi <[email protected]> Authored: Thu Oct 20 14:19:30 2016 -0400 Committer: smarthi <[email protected]> Committed: Thu Oct 20 14:19:30 2016 -0400 ---------------------------------------------------------------------- streams-runtimes/pom.xml | 1 - .../streams-runtime-storm/README.md | 8 -- .../trident/StreamsPersistWriterState.java | 124 ------------------- .../storm/trident/StreamsProcessorFunction.java | 72 ----------- .../storm/trident/StreamsProviderSpout.java | 85 ------------- .../streams/storm/trident/StreamsTopology.java | 73 ----------- 6 files changed, 363 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/pom.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/pom.xml b/streams-runtimes/pom.xml index 0194b45..ed18c65 100644 --- a/streams-runtimes/pom.xml +++ b/streams-runtimes/pom.xml @@ -37,6 +37,5 @@ <module>streams-runtime-dropwizard</module> <module>streams-runtime-local</module> <module>streams-runtime-pig</module> - <module>streams-runtime-storm</module> </modules> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/README.md ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-storm/README.md b/streams-runtimes/streams-runtime-storm/README.md deleted file mode 100644 index 08d0c3d..0000000 --- a/streams-runtimes/streams-runtime-storm/README.md +++ /dev/null @@ -1,8 +0,0 @@ -Apache Streams (incubating) -Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 --------------------------------------------------------------------------------- - -org.apache.streams:streams-runtime-storm -======================================== - -[README.md](src/site/markdown/index.md "README") http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java deleted file mode 100644 index e8b4a09..0000000 --- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsPersistWriterState.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.storm.trident; - -import backtype.storm.task.IMetricsContext; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistWriter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.trident.operation.TridentCollector; -import storm.trident.state.BaseStateUpdater; -import storm.trident.state.State; -import storm.trident.state.StateFactory; -import storm.trident.tuple.TridentTuple; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -/** - * Created by sblackmon on 1/16/14. - */ -public class StreamsPersistWriterState implements State { - - private final static Logger LOGGER = LoggerFactory.getLogger(StreamsPersistWriterState.class); - - StreamsPersistWriter writer; - StreamsPersistStateController controller; - - public StreamsPersistWriterState(StreamsPersistStateController controller) { - this.controller = new StreamsPersistStateController(); - writer.prepare(null); - } - - public void bulkMessages(List<TridentTuple> tuples) { - for (TridentTuple tuple : tuples) { - StreamsDatum entry = this.controller.fromTuple(tuple); - try { - writer.write(entry); - } catch (Exception e) { - LOGGER.error("Exception writing entry : {}", e, entry); - } - } - LOGGER.debug("******** Ending commit"); - } - - @Override - public void beginCommit(Long aLong) { - - } - - @Override - public void commit(Long aLong) { - - } - - public static class Factory implements StateFactory { - - private Logger logger; - private StreamsPersistStateController controller; - - public Factory(StreamsPersistWriter writer, StreamsPersistStateController controller) { - this.controller = controller; - this.logger = LoggerFactory.getLogger(Factory.class); - } - - @Override - public State makeState(Map map, IMetricsContext iMetricsContext, int i, int i2) { - this.logger.debug("Called makeState. . . "); - // convert map to config object - return new StreamsPersistWriterState(controller); - } - - } - - public static class StreamsPersistStateController implements Serializable { - - private String fieldName; - private ObjectMapper mapper = new ObjectMapper(); - - public StreamsPersistStateController() { - this.fieldName = "datum"; - } - - public StreamsPersistStateController(String fieldName) { - this.fieldName = fieldName; - } - - public StreamsDatum fromTuple(TridentTuple tuple) { - return mapper.convertValue(tuple.getValueByField(this.fieldName), StreamsDatum.class); - } - - } - - - - public static class StreamsPersistWriterSendMessage extends BaseStateUpdater<StreamsPersistWriterState> { - - private Logger logger = LoggerFactory.getLogger(StreamsPersistWriterSendMessage.class); - - @Override - public void updateState(StreamsPersistWriterState writerState, List<TridentTuple> tridentTuples, TridentCollector tridentCollector) { - this.logger.debug("**** calling send message. . ."); - writerState.bulkMessages(tridentTuples); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java deleted file mode 100644 index 4e183f0..0000000 --- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProcessorFunction.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.storm.trident; - -import com.google.common.collect.Lists; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsPersistWriter; -import org.apache.streams.core.StreamsProcessor; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.trident.operation.Function; -import storm.trident.operation.TridentCollector; -import storm.trident.operation.TridentOperationContext; -import storm.trident.tuple.TridentTuple; - -import java.math.BigInteger; -import java.util.List; -import java.util.Map; - -/** - * Created by sblackmon on 4/6/14. - */ -public class StreamsProcessorFunction implements Function { - - private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProcessorFunction.class); - - StreamsProcessor processor; - - @Override - public void execute(TridentTuple objects, TridentCollector tridentCollector) { - StreamsDatum datum = new StreamsDatum( - objects.getValueByField("document"), - new DateTime(objects.getLongByField("timestamp")), - new BigInteger(objects.getStringByField("sequenceid")) - ); - List<StreamsDatum> results = processor.process(datum); - for( StreamsDatum result : results ) { - tridentCollector.emit( Lists.newArrayList( - datum.getTimestamp(), - datum.getSequenceid(), - datum.getDocument() - )); - } - } - - @Override - public void prepare(Map map, TridentOperationContext tridentOperationContext) { - processor.prepare(map); - } - - @Override - public void cleanup() { - processor.cleanUp(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java deleted file mode 100644 index ea90558..0000000 --- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsProviderSpout.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.storm.trident; - -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Fields; -import com.google.common.collect.Lists; -import org.apache.commons.collections4.IteratorUtils; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.trident.operation.TridentCollector; -import storm.trident.spout.IBatchSpout; - -import java.util.List; -import java.util.Map; - -/** - * Created by sblackmon on 1/16/14. - */ -public class StreamsProviderSpout implements IBatchSpout { - - private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderSpout.class); - - StreamsProvider provider; - - public StreamsProviderSpout(StreamsProvider provider) { - this.provider = provider; - } - - @Override - public void open(Map map, TopologyContext topologyContext) { - provider.prepare(topologyContext); - } - - @Override - public synchronized void emitBatch(long l, TridentCollector tridentCollector) { - List<StreamsDatum> batch; - batch = IteratorUtils.toList(provider.readCurrent().iterator()); - for( StreamsDatum datum : batch ) { - tridentCollector.emit( Lists.newArrayList( - datum.getTimestamp(), - datum.getSequenceid(), - datum.getDocument() - )); - } - } - - @Override - public void ack(long l) { - - } - - @Override - public void close() { - provider.cleanUp(); - } - - @Override - public Map getComponentConfiguration() { - return null; - } - - @Override - public Fields getOutputFields() { - return new Fields("timestamp", "sequenceid", "document"); - } -}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/11e3a0f1/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsTopology.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsTopology.java b/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsTopology.java deleted file mode 100644 index 0bc97bd..0000000 --- a/streams-runtimes/streams-runtime-storm/src/main/java/org/apache/streams/storm/trident/StreamsTopology.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.storm.trident; - -import backtype.storm.Config; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import storm.trident.TridentTopology; - -/** - * Created with IntelliJ IDEA. - * User: sblackmon - * Date: 9/20/13 - * Time: 5:48 PM - * To change this template use File | Settings | File Templates. - */ -public abstract class StreamsTopology extends TridentTopology { - - StreamsConfiguration configuration; - Config stormConfig; - String runmode; - - protected StreamsTopology() { - - runmode = StreamsConfigurator.config.getConfig("storm").getString("runmode"); - stormConfig = new Config(); - - } - - protected StreamsTopology(StreamsConfiguration configuration) { - this.configuration = configuration; - } - - public StreamsConfiguration getConfiguration() { - return configuration; - } - - public void setConfiguration(StreamsConfiguration configuration) { - this.configuration = configuration; - } - - public String getRunmode() { - return runmode; - } - - public void setRunmode(String runmode) { - this.runmode = runmode; - } - - public Config getStormConfig() { - return stormConfig; - } - - public void setStormConfig(Config stormConfig) { - this.stormConfig = stormConfig; - } -}
