[
https://issues.apache.org/jira/browse/PIRK-21?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15432532#comment-15432532
]
ASF GitHub Bot commented on PIRK-21:
------------------------------------
Github user tellison commented on a diff in the pull request:
https://github.com/apache/incubator-pirk/pull/76#discussion_r75834645
--- Diff:
src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
---
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.spark.streaming;
+
+import java.math.BigInteger;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pirk.inputformat.hadoop.BaseInputFormat;
+import org.apache.pirk.inputformat.hadoop.InputFormatConst;
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.responder.wideskies.spark.Accumulators;
+import org.apache.pirk.responder.wideskies.spark.BroadcastVars;
+import org.apache.pirk.responder.wideskies.spark.EncColMultGroupedMapper;
+import org.apache.pirk.responder.wideskies.spark.EncColMultReducer;
+import org.apache.pirk.responder.wideskies.spark.EncRowCalc;
+import org.apache.pirk.responder.wideskies.spark.FilterData;
+import
org.apache.pirk.responder.wideskies.spark.HashSelectorsAndPartitionData;
+import org.apache.pirk.schema.data.DataSchema;
+import org.apache.pirk.schema.data.DataSchemaLoader;
+import org.apache.pirk.schema.data.DataSchemaRegistry;
+import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.schema.query.QuerySchemaLoader;
+import org.apache.pirk.schema.query.QuerySchemaRegistry;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.pirk.utils.PIRException;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.elasticsearch.hadoop.mr.EsInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Master class for the PIR query spark streaming application
+ * <p>
+ * NOTE:
+ * <p>
+ * - NOT using Elasticsearch in practice - proved to be some speed issues
with ES and Spark that appear to be ES-Spark specific - leave code in
anticipation
+ * that the ES-Spark issues resolve...
+ * <p>
+ * - Even if rdd.count() calls are embedded in logger.debug statements,
they are computed by Spark. Thus, they are commented out in the code below -
uncomment
+ * for rdd.count() debug
+ *
+ */
+public class ComputeStreamingResponse
+{
+ private static final Logger logger =
LoggerFactory.getLogger(ComputeStreamingResponse.class);
+
+ private String dataInputFormat = null;
+ private String inputData = null;
+ private String outputFile = null;
+ private String outputDirExp = null;
+
+ private String queryInput = null;
+ QuerySchema qSchema = null;
+
+ private String esQuery = "none";
+ private String esResource = "none";
+
+ private FileSystem fs = null;
+ private HadoopFileSystemStore storage = null;
+ private JavaStreamingContext jssc = null;
+
+ boolean useQueueStream = false;
+
+ private long batchSeconds = 0;
+ private long windowLength = 0;
+
+ private Accumulators accum = null;
+ private BroadcastVars bVars = null;
+
+ private QueryInfo queryInfo = null;
+ Query query = null;
+
+ private int numDataPartitions = 0;
+ private int numColMultPartitions = 0;
+
+ private boolean colMultReduceByKey = false;
+
+ public ComputeStreamingResponse(FileSystem fileSys) throws Exception
+ {
+ fs = fileSys;
+ storage = new HadoopFileSystemStore(fs);
+
+ dataInputFormat =
SystemConfiguration.getProperty("pir.dataInputFormat");
+ if (!InputFormatConst.ALLOWED_FORMATS.contains(dataInputFormat))
+ {
+ throw new IllegalArgumentException("inputFormat = " +
dataInputFormat + " is of an unknown form");
+ }
+ logger.info("inputFormat = " + dataInputFormat);
+ if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT))
+ {
+ inputData = SystemConfiguration.getProperty("pir.inputData", "none");
+ if (inputData.equals("none"))
+ {
+ throw new IllegalArgumentException("For inputFormat = " +
dataInputFormat + " an inputFile must be specified");
+ }
+ logger.info("inputFile = " + inputData);
+ }
+ else if (dataInputFormat.equals(InputFormatConst.ES))
+ {
+ esQuery = SystemConfiguration.getProperty("pir.esQuery", "none");
+ esResource = SystemConfiguration.getProperty("pir.esResource",
"none");
+ if (esQuery.equals("none"))
+ {
+ throw new IllegalArgumentException("esQuery must be specified");
+ }
+ if (esResource.equals("none"))
+ {
+ throw new IllegalArgumentException("esResource must be specified");
+ }
+ logger.info("esQuery = " + esQuery + " esResource = " + esResource);
+ }
+ outputFile = SystemConfiguration.getProperty("pir.outputFile");
+ outputDirExp = outputFile + "_exp";
+
+ queryInput = SystemConfiguration.getProperty("pir.queryInput");
+ String stopListFile =
SystemConfiguration.getProperty("pir.stopListFile");
+
+ logger.info("outputFile = " + outputFile + " queryInputDir = " +
queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery
+ + " esResource = " + esResource);
+
+ // Pull the batchSeconds and windowLength parameters
+ batchSeconds =
Long.parseLong(SystemConfiguration.getProperty("pir.sparkstreaming.batchSeconds",
"30"));
--- End diff --
```Long``` is a bit generous for a duration in seconds.
> Apache Spark Streaming Implementation
> -------------------------------------
>
> Key: PIRK-21
> URL: https://issues.apache.org/jira/browse/PIRK-21
> Project: PIRK
> Issue Type: Improvement
> Components: Responder
> Reporter: Ellison Anne Williams
> Assignee: Ellison Anne Williams
>
> Provide a Spark streaming implementation for Pirk.
> Although there is discussion and a forthcoming JIRA issue for Pirk
> integration with Apache Beam, we can, at the very least, use this
> implementation to benchmark straight Spark Streaming vs Beam + Spark
> Streaming.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)