Move Twitter Firehose Datasource to Test Source Folder Change-Id: Iefe2130707012b8ce60f5dfac96635a1a515a076 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1290 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/cb92dad7 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/cb92dad7 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/cb92dad7 Branch: refs/heads/master Commit: cb92dad732fb6094c70bfbd75cbfabe7352c3923 Parents: 9ddf5e0 Author: Abdullah Alamoudi <bamou...@gmail.com> Authored: Sat Oct 15 08:50:24 2016 -0700 Committer: Michael Blow <mb...@apache.org> Committed: Sat Oct 15 11:01:05 2016 -0700 ---------------------------------------------------------------------- .../queries/feeds/feeds_07/feeds_07.1.ddl.aql | 4 +- .../queries/feeds/feeds_08/feeds_08.1.ddl.aql | 3 +- .../queries/feeds/feeds_09/feeds_09.1.ddl.aql | 4 +- .../feeds/feeds_07/feeds_07.1.ddl.sqlpp | 4 +- .../feeds/feeds_08/feeds_08.1.ddl.sqlpp | 3 +- .../feeds/feeds_09/feeds_09.1.ddl.sqlpp | 4 +- asterixdb/asterix-experiments/pom.xml | 1 + asterixdb/asterix-external-data/pom.xml | 10 +- .../stream/TwitterFirehoseInputStream.java | 158 --- .../factory/TwitterFirehoseStreamFactory.java | 102 -- .../provider/DatasourceFactoryProvider.java | 6 - .../asterix/external/util/DataGenerator.java | 1189 ---------------- .../external/util/ExternalDataConstants.java | 1 - .../asterix/external/util/TweetGenerator.java | 154 -- .../external/generator/DataGenerator.java | 1192 ++++++++++++++++ .../external/generator/TweetGenerator.java | 154 ++ .../stream/TwitterFirehoseInputStream.java | 158 +++ .../stream/TwitterFirehoseStreamFactory.java | 102 ++ asterixdb/asterix-tools/pom.xml | 18 +- .../asterix/tools/datagen/AdgClientDriver.java | 52 - .../asterix/tools/datagen/AdmDataGen.java | 1020 ------------- .../asterix/tools/datagen/CustOrdDataGen.java | 476 ------- .../asterix/tools/datagen/EventDataGen.java | 237 ---- .../DataGeneratorForSpatialIndexEvaluation.java | 1341 ------------------ .../tools/external/data/GULongIDGenerator.java | 50 - ...TweetGeneratorForSpatialIndexEvaluation.java | 139 -- .../apache/asterix/tools/tbltoadm/TblToAdm.java | 97 -- .../tools/translator/ADGenDmlTranslator.java | 91 -- .../asterix/tools/datagen/AdgClientDriver.java | 52 + .../asterix/tools/datagen/AdmDataGen.java | 1020 +++++++++++++ .../asterix/tools/datagen/CustOrdDataGen.java | 476 +++++++ .../asterix/tools/datagen/EventDataGen.java | 237 ++++ .../DataGeneratorForSpatialIndexEvaluation.java | 1341 ++++++++++++++++++ .../tools/external/data/GULongIDGenerator.java | 50 + ...TweetGeneratorForSpatialIndexEvaluation.java | 139 ++ .../apache/asterix/tools/tbltoadm/TblToAdm.java | 97 ++ .../tools/translator/ADGenDmlTranslator.java | 91 ++ 37 files changed, 5139 insertions(+), 5134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql index 70ea8d6..f3f8f7d 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql @@ -54,12 +54,12 @@ create dataset SyntheticTweets(TweetMessageType) primary key id; create feed SyntheticTweetFeed -using twitter_firehose( +using stream( ("duration"="5"), ("tps"="50"), ("type-name"="TweetMessageType"), ("format"="adm"), -("reader-stream"="twitter_firehose"), +("stream-source"="org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory"), ("tput-duration"="5"), ("dataverse-dataset"="feeds:SyntheticTweets"), ("mode"="controlled")); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql index 658487b..c339563 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql @@ -58,8 +58,9 @@ primary key id; create index ngram_index on SyntheticTweets(message_text) type ngram(3); create feed SyntheticTweetFeed -using twitter_firehose +using stream (("duration"="5"), +("stream-source"="org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory"), ("tps"="50"), ("type-name"="TweetMessageType"), ("tput-duration"="5"), http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql index 6714850..59385c4 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql @@ -56,12 +56,12 @@ primary key id; create index message_text on SyntheticTweets(message_text) type btree; create feed SyntheticTweetFeed -using twitter_firehose +using stream (("duration"="5"), ("tps"="50"), ("tput-duration"="5"), ("type-name"="TweetMessageType"), ("dataverse-dataset"="feeds:SyntheticTweets"), ("format"="adm"), -("reader-stream"="twitter_firehose"), +("stream-source"="org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory"), ("mode"="controlled")); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp index e3f3ae5..1f24192 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp @@ -45,12 +45,12 @@ create type feeds.TweetMessageType as create dataset SyntheticTweets(TweetMessageType) primary key id; -create primary feed SyntheticTweetFeed using twitter_firehose( +create primary feed SyntheticTweetFeed using stream( (`duration`=`5`), (`tps`=`50`), (`type-name`=`TweetMessageType`), (`format`=`adm`), -(`reader-stream`=`twitter_firehose`), +(`stream-source`=`org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory`), (`tput-duration`=`5`), (`dataverse-dataset`=`feeds:SyntheticTweets`), (`mode`=`controlled`)); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp index a98b745..6311b8b 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp @@ -45,9 +45,10 @@ create type feeds.TweetMessageType as create dataset SyntheticTweets(TweetMessageType) primary key id; create index ngram_index on SyntheticTweets (message_text) type ngram (3); -create primary feed SyntheticTweetFeed using twitter_firehose ( +create primary feed SyntheticTweetFeed using stream ( (`duration`=`5`), (`tps`=`50`), +(`stream-source`=`org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory`), (`type-name`=`TweetMessageType`), (`tput-duration`=`5`), (`dataverse-dataset`=`feeds:SyntheticTweets`), http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp index 1b1c780..71a0ca2 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp @@ -45,7 +45,7 @@ create type feeds_09.TweetMessageType as } create dataset SyntheticTweets(TweetMessageType) primary key id; create index message_text on SyntheticTweets (message_text) type btree; -create primary feed SyntheticTweetFeed using twitter_firehose ((`duration`=`5`), +create primary feed SyntheticTweetFeed using stream ((`duration`=`5`), (`tps`=`50`),(`tput-duration`=`5`),(`type-name`=`TweetMessageType`), (`dataverse-dataset`=`feeds:SyntheticTweets`),(`format`=`adm`), -(`reader-stream`=`twitter_firehose`),(`mode`=`controlled`)); +(`stream-source`=`org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory`),(`mode`=`controlled`)); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-experiments/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-experiments/pom.xml b/asterixdb/asterix-experiments/pom.xml index b923fba..2718565 100644 --- a/asterixdb/asterix-experiments/pom.xml +++ b/asterixdb/asterix-experiments/pom.xml @@ -153,6 +153,7 @@ <groupId>org.apache.asterix</groupId> <artifactId>asterix-tools</artifactId> <version>${project.version}</version> + <type>test-jar</type> </dependency> <dependency> <groupId>com.hierynomus</groupId> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-external-data/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index 245f340..0cc6826 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -202,8 +202,6 @@ <groupId>org.apache.asterix</groupId> <artifactId>asterix-runtime</artifactId> <version>${project.version}</version> - <type>jar</type> - <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.hyracks</groupId> @@ -242,8 +240,7 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> - <type>jar</type> - <scope>compile</scope> + <scope>test</scope> <exclusions> <exclusion> <groupId>com.sun.jersey.jersey-test-framework</groupId> @@ -269,8 +266,6 @@ <groupId>net.java.dev.rome</groupId> <artifactId>rome-fetcher</artifactId> <version>1.0.0</version> - <type>jar</type> - <scope>compile</scope> <exclusions> <exclusion> <artifactId>rome</artifactId> @@ -287,11 +282,13 @@ <groupId>jdom</groupId> <artifactId>jdom</artifactId> <version>1.0</version> + <scope>test</scope> </dependency> <dependency> <groupId>com.microsoft.windowsazure</groupId> <artifactId>microsoft-windowsazure-api</artifactId> <version>0.4.4</version> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hive</groupId> @@ -302,6 +299,7 @@ <groupId>javax.jdo</groupId> <artifactId>jdo2-api</artifactId> <version>2.3-20090302111651</version> + <scope>test</scope> </dependency> <dependency> <groupId>com.e-movimento.tinytools</groupId> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java deleted file mode 100644 index e2afd7b..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.stream; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.external.api.AsterixInputStream; -import org.apache.asterix.external.util.TweetGenerator; -import org.apache.hyracks.api.context.IHyracksTaskContext; - -public class TwitterFirehoseInputStream extends AsterixInputStream { - - private static final Logger LOGGER = Logger.getLogger(TwitterFirehoseInputStream.class.getName()); - private final ExecutorService executorService; - private final PipedOutputStream outputStream; - private final PipedInputStream inputStream; - private final DataProvider dataProvider; - private boolean started; - - public TwitterFirehoseInputStream(Map<String, String> configuration, IHyracksTaskContext ctx, int partition) - throws IOException { - executorService = Executors.newCachedThreadPool(); - outputStream = new PipedOutputStream(); - inputStream = new PipedInputStream(outputStream); - dataProvider = new DataProvider(configuration, partition, outputStream); - started = false; - } - - @Override - public boolean stop() throws IOException { - dataProvider.stop(); - return true; - } - - public synchronized void start() { - if (!started) { - executorService.execute(dataProvider); - started = true; - } - } - - @Override - public int read() throws IOException { - if (!started) { - start(); - } - return inputStream.read(); - } - - @Override - public int read(byte b[], int off, int len) throws IOException { - if (!started) { - start(); - } - return inputStream.read(b, off, len); - } - - @Override - public boolean handleException(Throwable th) { - return false; - } - - private static class DataProvider implements Runnable { - - public static final String KEY_MODE = "mode"; - - private final TweetGenerator tweetGenerator; - private boolean continuePush = true; - private int batchSize; - private final Mode mode; - private final OutputStream os; - - public static enum Mode { - AGGRESSIVE, - CONTROLLED - } - - public DataProvider(Map<String, String> configuration, int partition, OutputStream os) { - this.tweetGenerator = new TweetGenerator(configuration, partition); - this.tweetGenerator.registerSubscriber(os); - this.os = os; - mode = configuration.get(KEY_MODE) != null ? Mode.valueOf(configuration.get(KEY_MODE).toUpperCase()) - : Mode.AGGRESSIVE; - switch (mode) { - case CONTROLLED: - String tpsValue = configuration.get(TweetGenerator.KEY_TPS); - if (tpsValue == null) { - throw new IllegalArgumentException("TPS value not configured. use tps=<value>"); - } - batchSize = Integer.parseInt(tpsValue); - break; - case AGGRESSIVE: - batchSize = 5000; - break; - } - } - - @Override - public void run() { - boolean moreData = true; - long startBatch; - long endBatch; - while (true) { - try { - while (moreData && continuePush) { - switch (mode) { - case AGGRESSIVE: - moreData = tweetGenerator.generateNextBatch(batchSize); - break; - case CONTROLLED: - startBatch = System.currentTimeMillis(); - moreData = tweetGenerator.generateNextBatch(batchSize); - endBatch = System.currentTimeMillis(); - if ((endBatch - startBatch) < 1000) { - Thread.sleep(1000 - (endBatch - startBatch)); - } - break; - } - } - os.close(); - break; - } catch (Exception e) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Exception in adapter " + e.getMessage()); - } - } - } - } - - public void stop() { - continuePush = false; - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java deleted file mode 100644 index abe67fd..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.stream.factory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.asterix.external.api.AsterixInputStream; -import org.apache.asterix.external.api.IInputStreamFactory; -import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream; -import org.apache.asterix.runtime.util.ClusterStateManager; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -/** - * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter - * simulates a twitter firehose with tweets being "pushed" into Asterix at a - * configurable rate measured in terms of TPS (tweets/second). The stream of - * tweets lasts for a configurable duration (measured in seconds). - */ -public class TwitterFirehoseStreamFactory implements IInputStreamFactory { - - private static final long serialVersionUID = 1L; - - /** - * Degree of parallelism for feed ingestion activity. Defaults to 1. This - * determines the count constraint for the ingestion operator. - **/ - private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality"; - - /** - * The absolute locations where ingestion operator instances will be placed. - **/ - private static final String KEY_INGESTION_LOCATIONS = "ingestion-location"; - - private Map<String, String> configuration; - - @Override - public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { - String ingestionCardinalityParam = configuration.get(KEY_INGESTION_CARDINALITY); - String ingestionLocationParam = configuration.get(KEY_INGESTION_LOCATIONS); - String[] locations = null; - if (ingestionLocationParam != null) { - locations = ingestionLocationParam.split(","); - } - int count = locations != null ? locations.length : 1; - if (ingestionCardinalityParam != null) { - count = Integer.parseInt(ingestionCardinalityParam); - } - - List<String> chosenLocations = new ArrayList<String>(); - String[] availableLocations = locations != null ? locations - : ClusterStateManager.INSTANCE.getParticipantNodes().toArray(new String[] {}); - for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) { - chosenLocations.add(availableLocations[k]); - } - return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {})); - } - - @Override - public DataSourceType getDataSourceType() { - return DataSourceType.STREAM; - } - - @Override - public void configure(Map<String, String> configuration) { - this.configuration = configuration; - } - - @Override - public boolean isIndexible() { - return false; - } - - @Override - public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException { - try { - return new TwitterFirehoseInputStream(configuration, ctx, partition); - } catch (IOException e) { - throw new HyracksDataException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java index ad11171..7ab6430 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java @@ -33,7 +33,6 @@ import org.apache.asterix.external.input.record.reader.twitter.TwitterRecordRead import org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory; import org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory; import org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory; -import org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamFactory; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataUtils; @@ -73,9 +72,6 @@ public class DatasourceFactoryProvider { case ExternalDataConstants.STREAM_SOCKET_CLIENT: streamSourceFactory = new SocketServerInputStreamFactory(); break; - case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER: - streamSourceFactory = new TwitterFirehoseStreamFactory(); - break; default: try { streamSourceFactory = (IInputStreamFactory) Class.forName(streamSource).newInstance(); @@ -102,8 +98,6 @@ public class DatasourceFactoryProvider { case ExternalDataConstants.READER_PUSH_TWITTER: case ExternalDataConstants.READER_PULL_TWITTER: return new TwitterRecordReaderFactory(); - case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER: - return new StreamRecordReaderFactory(new TwitterFirehoseStreamFactory()); case ExternalDataConstants.ALIAS_SOCKET_ADAPTER: case ExternalDataConstants.SOCKET: return new StreamRecordReaderFactory(new SocketServerInputStreamFactory());