[
https://issues.apache.org/jira/browse/PIRK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15428674#comment-15428674
]
ASF GitHub Bot commented on PIRK-4:
-----------------------------------
Github user smarthi commented on a diff in the pull request:
https://github.com/apache/incubator-pirk/pull/74#discussion_r75537022
--- Diff:
src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java ---
@@ -0,0 +1,198 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+
*******************************************************************************/
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Bolt to compute and output the final Response object for a query
+ * <p>
+ * Receives {@code <colIndex, colProduct>} tuples, computes the final
column product for each colIndex, records the results in the final Response
object, and
+ * outputs the final Response object for the query.
+ * <p>
+ * Flush signals are sent to the OuputBolt from the EncColMultBolts via a
tuple of the form {@code <-1, 0>}. Once a flush signal has been received from
each
+ * EncColMultBolt (or a timeout is reached), the final column product is
computed and the final Response is formed and emitted.
+ * <p>
+ * Currently, the Responses are written to HDFS to location specified by
the outputFile with the timestamp appended.
+ * <p>
+ * TODO: -- Enable other Response output locations
+ *
+ */
+public class OutputBolt extends BaseRichBolt
+{
+ private static final long serialVersionUID = 1L;
+
+ private static final org.slf4j.Logger logger =
LoggerFactory.getLogger(OutputBolt.class);
+
+ private OutputCollector outputCollector;
+ private QueryInfo queryInfo;
+ private Response response;
+ private String outputFile;
+ private boolean hdfs;
+ private String hdfsUri;
+ private Integer flushCounter = 0;
+ private ArrayList<Tuple> tuplesToAck = new ArrayList<Tuple>();
+ private Integer totalFlushSigs;
+
+ private LocalFileSystemStore localStore;
+ private HadoopFileSystemStore hadoopStore;
+
+ // This latch just serves as a hook for testing.
+ public static CountDownLatch latch = new CountDownLatch(1);
+
+ // This is the main object here. It holds column Id -> product
+ private HashMap<Long,BigInteger> resultsMap = new
HashMap<Long,BigInteger>();
--- End diff --
Use a Map on LHS
> Add Streaming Implementation for Apache Storm
> ---------------------------------------------
>
> Key: PIRK-4
> URL: https://issues.apache.org/jira/browse/PIRK-4
> Project: PIRK
> Issue Type: Task
> Components: Responder
> Reporter: Chris Harris
> Assignee: Chris Harris
>
> Per the Pirk Roadmap, this is a feature to add support for Apache Storm
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)