[
https://issues.apache.org/jira/browse/STORM-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15394215#comment-15394215
]
ASF GitHub Bot commented on STORM-1979:
---------------------------------------
Github user harshach commented on a diff in the pull request:
https://github.com/apache/storm/pull/1583#discussion_r72303978
--- Diff:
external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.druid.bolt;
+
+import com.metamx.tranquility.tranquilizer.MessageDroppedException;
+import com.metamx.tranquility.tranquilizer.Tranquilizer;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Basic bolt implementation for storing data to Druid datastore.
+ * <p/>
+ * This implementation uses Druid's Tranquility library
(https://github.com/druid-io/tranquility)
+ * to send to druid store.
+ * Some of the concepts are borrowed from Tranquility storm connector
implementation.
+ * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
+ *
+ * This Bolt expects to receive tuples in which the zeroth element is your
event type.
+ * <p/>
+ *
+ */
+public class DruidBeamBolt<E> extends BaseRichBolt {
+ private static final Logger LOG =
LoggerFactory.getLogger(DruidBeamBolt.class);
+
+ private volatile OutputCollector collector;
+ private DruidBeamFactory<E> beamFactory = null;
+ private int batchSize;
+ private Tranquilizer<E> tranquilizer = null;
+
+ public DruidBeamBolt(DruidBeamFactory<E> beamFactory) {
+ this(beamFactory, 2000);
+ }
+
+ public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
+ this.beamFactory = beamFactory;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
+ this.collector = collector;
+ tranquilizer = Tranquilizer.create(
+ beamFactory.makeBeam(stormConf, context),
+ batchSize,
+ Tranquilizer.DefaultMaxPendingBatches(),
+ Tranquilizer.DefaultLingerMillis());
+ this.tranquilizer.start();
+
+ }
+
+ @Override
+ public void execute(final Tuple tuple) {
+ Future future = tranquilizer.send((E)tuple.getValue(0));
+ future.addEventListener(new FutureEventListener() {
+ @Override
+ public void onFailure(Throwable cause) {
+ if(cause instanceof MessageDroppedException) {
--- End diff --
I am +1 on @satishd recommendation. Atleast we can drive through
configuration i.e druid can discard messages and we can provide discard stream
of sorts via config if users configures enableDiscardStream and
discardStream="druid-discard-stream" than you write the tuples there. Logging
on druid is great but this will allow users to act on them immediately.
> Storm Druid Connector
> ---------------------
>
> Key: STORM-1979
> URL: https://issues.apache.org/jira/browse/STORM-1979
> Project: Apache Storm
> Issue Type: Improvement
> Reporter: Sriharsha Chintalapani
> Assignee: Manikumar Reddy
>
> Storm Bolt & Trident state implementation for Druid.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)