Move Twitter Firehose Datasource to Test Source Folder

Change-Id: Iefe2130707012b8ce60f5dfac96635a1a515a076
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1290
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/cb92dad7
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/cb92dad7
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/cb92dad7

Branch: refs/heads/master
Commit: cb92dad732fb6094c70bfbd75cbfabe7352c3923
Parents: 9ddf5e0
Author: Abdullah Alamoudi <bamou...@gmail.com>
Authored: Sat Oct 15 08:50:24 2016 -0700
Committer: Michael Blow <mb...@apache.org>
Committed: Sat Oct 15 11:01:05 2016 -0700

----------------------------------------------------------------------
 .../queries/feeds/feeds_07/feeds_07.1.ddl.aql   |    4 +-
 .../queries/feeds/feeds_08/feeds_08.1.ddl.aql   |    3 +-
 .../queries/feeds/feeds_09/feeds_09.1.ddl.aql   |    4 +-
 .../feeds/feeds_07/feeds_07.1.ddl.sqlpp         |    4 +-
 .../feeds/feeds_08/feeds_08.1.ddl.sqlpp         |    3 +-
 .../feeds/feeds_09/feeds_09.1.ddl.sqlpp         |    4 +-
 asterixdb/asterix-experiments/pom.xml           |    1 +
 asterixdb/asterix-external-data/pom.xml         |   10 +-
 .../stream/TwitterFirehoseInputStream.java      |  158 ---
 .../factory/TwitterFirehoseStreamFactory.java   |  102 --
 .../provider/DatasourceFactoryProvider.java     |    6 -
 .../asterix/external/util/DataGenerator.java    | 1189 ----------------
 .../external/util/ExternalDataConstants.java    |    1 -
 .../asterix/external/util/TweetGenerator.java   |  154 --
 .../external/generator/DataGenerator.java       | 1192 ++++++++++++++++
 .../external/generator/TweetGenerator.java      |  154 ++
 .../stream/TwitterFirehoseInputStream.java      |  158 +++
 .../stream/TwitterFirehoseStreamFactory.java    |  102 ++
 asterixdb/asterix-tools/pom.xml                 |   18 +-
 .../asterix/tools/datagen/AdgClientDriver.java  |   52 -
 .../asterix/tools/datagen/AdmDataGen.java       | 1020 -------------
 .../asterix/tools/datagen/CustOrdDataGen.java   |  476 -------
 .../asterix/tools/datagen/EventDataGen.java     |  237 ----
 .../DataGeneratorForSpatialIndexEvaluation.java | 1341 ------------------
 .../tools/external/data/GULongIDGenerator.java  |   50 -
 ...TweetGeneratorForSpatialIndexEvaluation.java |  139 --
 .../apache/asterix/tools/tbltoadm/TblToAdm.java |   97 --
 .../tools/translator/ADGenDmlTranslator.java    |   91 --
 .../asterix/tools/datagen/AdgClientDriver.java  |   52 +
 .../asterix/tools/datagen/AdmDataGen.java       | 1020 +++++++++++++
 .../asterix/tools/datagen/CustOrdDataGen.java   |  476 +++++++
 .../asterix/tools/datagen/EventDataGen.java     |  237 ++++
 .../DataGeneratorForSpatialIndexEvaluation.java | 1341 ++++++++++++++++++
 .../tools/external/data/GULongIDGenerator.java  |   50 +
 ...TweetGeneratorForSpatialIndexEvaluation.java |  139 ++
 .../apache/asterix/tools/tbltoadm/TblToAdm.java |   97 ++
 .../tools/translator/ADGenDmlTranslator.java    |   91 ++
 37 files changed, 5139 insertions(+), 5134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql
index 70ea8d6..f3f8f7d 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_07/feeds_07.1.ddl.aql
@@ -54,12 +54,12 @@ create dataset SyntheticTweets(TweetMessageType)
 primary key id;
 
 create feed  SyntheticTweetFeed
-using twitter_firehose(
+using stream(
 ("duration"="5"),
 ("tps"="50"),
 ("type-name"="TweetMessageType"),
 ("format"="adm"),
-("reader-stream"="twitter_firehose"),
+("stream-source"="org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory"),
 ("tput-duration"="5"),
 ("dataverse-dataset"="feeds:SyntheticTweets"),
 ("mode"="controlled"));
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql
index 658487b..c339563 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_08/feeds_08.1.ddl.aql
@@ -58,8 +58,9 @@ primary key id;
 create index ngram_index on SyntheticTweets(message_text) type ngram(3);
 
 create feed  SyntheticTweetFeed
-using twitter_firehose
+using stream
 (("duration"="5"),
+("stream-source"="org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory"),
 ("tps"="50"),
 ("type-name"="TweetMessageType"),
 ("tput-duration"="5"),

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql
index 6714850..59385c4 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feeds_09/feeds_09.1.ddl.aql
@@ -56,12 +56,12 @@ primary key id;
 create index message_text on SyntheticTweets(message_text) type btree;
 
 create feed  SyntheticTweetFeed
-using twitter_firehose
+using stream
 (("duration"="5"),
 ("tps"="50"),
 ("tput-duration"="5"),
 ("type-name"="TweetMessageType"),
 ("dataverse-dataset"="feeds:SyntheticTweets"),
 ("format"="adm"),
-("reader-stream"="twitter_firehose"),
+("stream-source"="org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory"),
 ("mode"="controlled"));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp
index e3f3ae5..1f24192 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_07/feeds_07.1.ddl.sqlpp
@@ -45,12 +45,12 @@ create type feeds.TweetMessageType as
 
 create  dataset SyntheticTweets(TweetMessageType) primary key id;
 
-create  primary feed SyntheticTweetFeed using twitter_firehose(
+create  primary feed SyntheticTweetFeed using stream(
 (`duration`=`5`),
 (`tps`=`50`),
 (`type-name`=`TweetMessageType`),
 (`format`=`adm`),
-(`reader-stream`=`twitter_firehose`),
+(`stream-source`=`org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory`),
 (`tput-duration`=`5`),
 (`dataverse-dataset`=`feeds:SyntheticTweets`),
 (`mode`=`controlled`));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp
index a98b745..6311b8b 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_08/feeds_08.1.ddl.sqlpp
@@ -45,9 +45,10 @@ create type feeds.TweetMessageType as
 
 create  dataset SyntheticTweets(TweetMessageType) primary key id;
 create  index ngram_index  on SyntheticTweets (message_text) type ngram (3);
-create  primary feed SyntheticTweetFeed using twitter_firehose (
+create  primary feed SyntheticTweetFeed using stream (
 (`duration`=`5`),
 (`tps`=`50`),
+(`stream-source`=`org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory`),
 (`type-name`=`TweetMessageType`),
 (`tput-duration`=`5`),
 (`dataverse-dataset`=`feeds:SyntheticTweets`),

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp
index 1b1c780..71a0ca2 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feeds_09/feeds_09.1.ddl.sqlpp
@@ -45,7 +45,7 @@ create type feeds_09.TweetMessageType as
 }
 create  dataset SyntheticTweets(TweetMessageType) primary key id;
 create  index message_text  on SyntheticTweets (message_text) type btree;
-create  primary feed SyntheticTweetFeed using twitter_firehose 
((`duration`=`5`),
+create  primary feed SyntheticTweetFeed using stream ((`duration`=`5`),
 (`tps`=`50`),(`tput-duration`=`5`),(`type-name`=`TweetMessageType`),
 (`dataverse-dataset`=`feeds:SyntheticTweets`),(`format`=`adm`),
-(`reader-stream`=`twitter_firehose`),(`mode`=`controlled`));
+(`stream-source`=`org.apache.asterix.external.input.stream.TwitterFirehoseStreamFactory`),(`mode`=`controlled`));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-experiments/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-experiments/pom.xml 
b/asterixdb/asterix-experiments/pom.xml
index b923fba..2718565 100644
--- a/asterixdb/asterix-experiments/pom.xml
+++ b/asterixdb/asterix-experiments/pom.xml
@@ -153,6 +153,7 @@
       <groupId>org.apache.asterix</groupId>
       <artifactId>asterix-tools</artifactId>
       <version>${project.version}</version>
+      <type>test-jar</type>
     </dependency>
     <dependency>
       <groupId>com.hierynomus</groupId>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-external-data/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/pom.xml 
b/asterixdb/asterix-external-data/pom.xml
index 245f340..0cc6826 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -202,8 +202,6 @@
       <groupId>org.apache.asterix</groupId>
       <artifactId>asterix-runtime</artifactId>
       <version>${project.version}</version>
-      <type>jar</type>
-      <scope>compile</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
@@ -242,8 +240,7 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
-      <type>jar</type>
-      <scope>compile</scope>
+      <scope>test</scope>
       <exclusions>
         <exclusion>
           <groupId>com.sun.jersey.jersey-test-framework</groupId>
@@ -269,8 +266,6 @@
       <groupId>net.java.dev.rome</groupId>
       <artifactId>rome-fetcher</artifactId>
       <version>1.0.0</version>
-      <type>jar</type>
-      <scope>compile</scope>
       <exclusions>
         <exclusion>
           <artifactId>rome</artifactId>
@@ -287,11 +282,13 @@
       <groupId>jdom</groupId>
       <artifactId>jdom</artifactId>
       <version>1.0</version>
+      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>com.microsoft.windowsazure</groupId>
       <artifactId>microsoft-windowsazure-api</artifactId>
       <version>0.4.4</version>
+      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
@@ -302,6 +299,7 @@
       <groupId>javax.jdo</groupId>
       <artifactId>jdo2-api</artifactId>
       <version>2.3-20090302111651</version>
+      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>com.e-movimento.tinytools</groupId>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
deleted file mode 100644
index e2afd7b..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.util.TweetGenerator;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class TwitterFirehoseInputStream extends AsterixInputStream {
-
-    private static final Logger LOGGER = 
Logger.getLogger(TwitterFirehoseInputStream.class.getName());
-    private final ExecutorService executorService;
-    private final PipedOutputStream outputStream;
-    private final PipedInputStream inputStream;
-    private final DataProvider dataProvider;
-    private boolean started;
-
-    public TwitterFirehoseInputStream(Map<String, String> configuration, 
IHyracksTaskContext ctx, int partition)
-            throws IOException {
-        executorService = Executors.newCachedThreadPool();
-        outputStream = new PipedOutputStream();
-        inputStream = new PipedInputStream(outputStream);
-        dataProvider = new DataProvider(configuration, partition, 
outputStream);
-        started = false;
-    }
-
-    @Override
-    public boolean stop() throws IOException {
-        dataProvider.stop();
-        return true;
-    }
-
-    public synchronized void start() {
-        if (!started) {
-            executorService.execute(dataProvider);
-            started = true;
-        }
-    }
-
-    @Override
-    public int read() throws IOException {
-        if (!started) {
-            start();
-        }
-        return inputStream.read();
-    }
-
-    @Override
-    public int read(byte b[], int off, int len) throws IOException {
-        if (!started) {
-            start();
-        }
-        return inputStream.read(b, off, len);
-    }
-
-    @Override
-    public boolean handleException(Throwable th) {
-        return false;
-    }
-
-    private static class DataProvider implements Runnable {
-
-        public static final String KEY_MODE = "mode";
-
-        private final TweetGenerator tweetGenerator;
-        private boolean continuePush = true;
-        private int batchSize;
-        private final Mode mode;
-        private final OutputStream os;
-
-        public static enum Mode {
-            AGGRESSIVE,
-            CONTROLLED
-        }
-
-        public DataProvider(Map<String, String> configuration, int partition, 
OutputStream os) {
-            this.tweetGenerator = new TweetGenerator(configuration, partition);
-            this.tweetGenerator.registerSubscriber(os);
-            this.os = os;
-            mode = configuration.get(KEY_MODE) != null ? 
Mode.valueOf(configuration.get(KEY_MODE).toUpperCase())
-                    : Mode.AGGRESSIVE;
-            switch (mode) {
-                case CONTROLLED:
-                    String tpsValue = 
configuration.get(TweetGenerator.KEY_TPS);
-                    if (tpsValue == null) {
-                        throw new IllegalArgumentException("TPS value not 
configured. use tps=<value>");
-                    }
-                    batchSize = Integer.parseInt(tpsValue);
-                    break;
-                case AGGRESSIVE:
-                    batchSize = 5000;
-                    break;
-            }
-        }
-
-        @Override
-        public void run() {
-            boolean moreData = true;
-            long startBatch;
-            long endBatch;
-            while (true) {
-                try {
-                    while (moreData && continuePush) {
-                        switch (mode) {
-                            case AGGRESSIVE:
-                                moreData = 
tweetGenerator.generateNextBatch(batchSize);
-                                break;
-                            case CONTROLLED:
-                                startBatch = System.currentTimeMillis();
-                                moreData = 
tweetGenerator.generateNextBatch(batchSize);
-                                endBatch = System.currentTimeMillis();
-                                if ((endBatch - startBatch) < 1000) {
-                                    Thread.sleep(1000 - (endBatch - 
startBatch));
-                                }
-                                break;
-                        }
-                    }
-                    os.close();
-                    break;
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Exception in adapter " + 
e.getMessage());
-                    }
-                }
-            }
-        }
-
-        public void stop() {
-            continuePush = false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
deleted file mode 100644
index abe67fd..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.input.stream.factory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IInputStreamFactory;
-import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
-import org.apache.asterix.runtime.util.ClusterStateManager;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * Factory class for creating @see{TwitterFirehoseFeedAdapter}. The adapter
- * simulates a twitter firehose with tweets being "pushed" into Asterix at a
- * configurable rate measured in terms of TPS (tweets/second). The stream of
- * tweets lasts for a configurable duration (measured in seconds).
- */
-public class TwitterFirehoseStreamFactory implements IInputStreamFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * Degree of parallelism for feed ingestion activity. Defaults to 1. This
-     * determines the count constraint for the ingestion operator.
-     **/
-    private static final String KEY_INGESTION_CARDINALITY = 
"ingestion-cardinality";
-
-    /**
-     * The absolute locations where ingestion operator instances will be 
placed.
-     **/
-    private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
-
-    private Map<String, String> configuration;
-
-    @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        String ingestionCardinalityParam = 
configuration.get(KEY_INGESTION_CARDINALITY);
-        String ingestionLocationParam = 
configuration.get(KEY_INGESTION_LOCATIONS);
-        String[] locations = null;
-        if (ingestionLocationParam != null) {
-            locations = ingestionLocationParam.split(",");
-        }
-        int count = locations != null ? locations.length : 1;
-        if (ingestionCardinalityParam != null) {
-            count = Integer.parseInt(ingestionCardinalityParam);
-        }
-
-        List<String> chosenLocations = new ArrayList<String>();
-        String[] availableLocations = locations != null ? locations
-                : 
ClusterStateManager.INSTANCE.getParticipantNodes().toArray(new String[] {});
-        for (int i = 0, k = 0; i < count; i++, k = (k + 1) % 
availableLocations.length) {
-            chosenLocations.add(availableLocations[k]);
-        }
-        return new 
AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
-    }
-
-    @Override
-    public DataSourceType getDataSourceType() {
-        return DataSourceType.STREAM;
-    }
-
-    @Override
-    public void configure(Map<String, String> configuration) {
-        this.configuration = configuration;
-    }
-
-    @Override
-    public boolean isIndexible() {
-        return false;
-    }
-
-    @Override
-    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int 
partition) throws HyracksDataException {
-        try {
-            return new TwitterFirehoseInputStream(configuration, ctx, 
partition);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cb92dad7/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index ad11171..7ab6430 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -33,7 +33,6 @@ import 
org.apache.asterix.external.input.record.reader.twitter.TwitterRecordRead
 import 
org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactory;
 import 
org.apache.asterix.external.input.stream.factory.SocketClientInputStreamFactory;
 import 
org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
-import 
org.apache.asterix.external.input.stream.factory.TwitterFirehoseStreamFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 
@@ -73,9 +72,6 @@ public class DatasourceFactoryProvider {
                 case ExternalDataConstants.STREAM_SOCKET_CLIENT:
                     streamSourceFactory = new SocketServerInputStreamFactory();
                     break;
-                case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
-                    streamSourceFactory = new TwitterFirehoseStreamFactory();
-                    break;
                 default:
                     try {
                         streamSourceFactory = (IInputStreamFactory) 
Class.forName(streamSource).newInstance();
@@ -102,8 +98,6 @@ public class DatasourceFactoryProvider {
             case ExternalDataConstants.READER_PUSH_TWITTER:
             case ExternalDataConstants.READER_PULL_TWITTER:
                 return new TwitterRecordReaderFactory();
-            case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
-                return new StreamRecordReaderFactory(new 
TwitterFirehoseStreamFactory());
             case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
             case ExternalDataConstants.SOCKET:
                 return new StreamRecordReaderFactory(new 
SocketServerInputStreamFactory());

Reply via email to