[
https://issues.apache.org/jira/browse/STORM-1030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089377#comment-15089377
]
ASF GitHub Bot commented on STORM-1030:
---------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/871#discussion_r49201617
--- Diff:
external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java ---
@@ -25,75 +26,40 @@
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.utils.TupleUtils;
import backtype.storm.Config;
+import org.apache.storm.hive.common.HiveConnector;
import org.apache.storm.hive.common.HiveWriter;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hive.hcatalog.streaming.*;
import org.apache.storm.hive.common.HiveOptions;
import org.apache.storm.hive.common.HiveUtils;
-import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.ArrayList;
import java.util.Map;
-import java.util.HashMap;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.List;
import java.util.LinkedList;
-import java.io.IOException;
public class HiveBolt extends BaseRichBolt {
private static final Logger LOG =
LoggerFactory.getLogger(HiveBolt.class);
private OutputCollector collector;
private HiveOptions options;
- private ExecutorService callTimeoutPool;
- private transient Timer heartBeatTimer;
- private Boolean kerberosEnabled = false;
- private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
- private UserGroupInformation ugi = null;
- HashMap<HiveEndPoint, HiveWriter> allWriters;
+ private HiveConnector hiveConnector;
private List<Tuple> tupleBatch;
+ transient CountMetric serializationErrorMetric;
public HiveBolt(HiveOptions options) {
this.options = options;
tupleBatch = new LinkedList<>();
+
}
@Override
public void prepare(Map conf, TopologyContext topologyContext,
OutputCollector collector) {
try {
- if(options.getKerberosPrincipal() == null &&
options.getKerberosKeytab() == null) {
- kerberosEnabled = false;
- } else if(options.getKerberosPrincipal() != null &&
options.getKerberosKeytab() != null) {
- kerberosEnabled = true;
- } else {
- throw new IllegalArgumentException("To enable Kerberos,
need to set both KerberosPrincipal " +
- " & KerberosKeytab");
- }
-
- if (kerberosEnabled) {
- try {
- ugi =
HiveUtils.authenticate(options.getKerberosKeytab(),
options.getKerberosPrincipal());
- } catch(HiveUtils.AuthenticationFailed ex) {
- LOG.error("Hive Kerberos authentication failed " +
ex.getMessage(), ex);
- throw new IllegalArgumentException(ex);
- }
- }
this.collector = collector;
- allWriters = new HashMap<HiveEndPoint,HiveWriter>();
- String timeoutName = "hive-bolt-%d";
- this.callTimeoutPool = Executors.newFixedThreadPool(1,
- new
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
- heartBeatTimer = new Timer();
- setupHeartBeatTimer();
-
+ this.hiveConnector = new HiveConnector(this.options);
+ this.hiveConnector.configure();
+ this.serializationErrorMetric = new CountMetric();
+
topologyContext.registerMetric("hive_serialization_error_count",
serializationErrorMetric, 0);
--- End diff --
Not really sure what registering a metric that is reported every 0 seconds
does.
> Hive Connector Fixes
> --------------------
>
> Key: STORM-1030
> URL: https://issues.apache.org/jira/browse/STORM-1030
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-hive
> Reporter: Sriharsha Chintalapani
> Assignee: Sriharsha Chintalapani
> Fix For: 0.11.0
>
>
> 1. Schedule Hive transaction heartbeats outside of execute method.
> 2. Fix retiring idleWriters
> 3. Do not call flush if there is no data added to a txnbatch
> 4. Catch any exception and abort transaction.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)