[
https://issues.apache.org/jira/browse/STORM-1030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089383#comment-15089383
]
ASF GitHub Bot commented on STORM-1030:
---------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/871#discussion_r49201838
--- Diff:
external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveConnector.java
---
@@ -0,0 +1,241 @@
+package org.apache.storm.hive.common;
+
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HiveConnector {
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveConnector.class);
+ private HiveOptions options;
+ private transient Timer heartBeatTimer;
+ private AtomicBoolean sendHeartBeat = new AtomicBoolean(true);
+ private UserGroupInformation ugi = null;
+ private Map<HiveEndPoint, HiveWriter> allWriters;
+ private ExecutorService callTimeoutPool;
+
+ public HiveConnector(HiveOptions options) {
+ this.options = options;
+
+ }
+
+ public void configure() {
+ boolean kerberosEnabled;
+
+ if(this.options.getKerberosPrincipal() == null &&
this.options.getKerberosKeytab() == null) {
+ kerberosEnabled = false;
+ } else if(options.getKerberosPrincipal() != null &&
options.getKerberosKeytab() != null) {
+ kerberosEnabled = true;
+ } else {
+ throw new IllegalArgumentException("To enable Kerberos, need
to set both KerberosPrincipal " +
+ " & KerberosKeytab");
+ }
+
+ if (kerberosEnabled) {
+ try {
+ ugi = HiveUtils.authenticate(options.getKerberosKeytab(),
options.getKerberosPrincipal());
+ } catch(HiveUtils.AuthenticationFailed ex) {
+ LOG.error("Hive Kerberos authentication failed " +
ex.getMessage(), ex);
+ throw new IllegalArgumentException(ex);
+ }
+ }
+
+ allWriters = new ConcurrentHashMap<HiveEndPoint, HiveWriter>();
+ String timeoutName = "hive-bolt-%d";
+ this.callTimeoutPool = Executors.newFixedThreadPool(1,
+ new
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+ heartBeatTimer = new Timer();
+ setupHeartBeatTimer();
+ }
+
+
--- End diff --
Extra sapce
> Hive Connector Fixes
> --------------------
>
> Key: STORM-1030
> URL: https://issues.apache.org/jira/browse/STORM-1030
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-hive
> Reporter: Sriharsha Chintalapani
> Assignee: Sriharsha Chintalapani
> Fix For: 0.11.0
>
>
> 1. Schedule Hive transaction heartbeats outside of execute method.
> 2. Fix retiring idleWriters
> 3. Do not call flush if there is no data added to a txnbatch
> 4. Catch any exception and abort transaction.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)