[ 
https://issues.apache.org/jira/browse/STORM-1030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089383#comment-15089383
 ] 

ASF GitHub Bot commented on STORM-1030:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/871#discussion_r49201838
  
    --- Diff: 
external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveConnector.java
 ---
    @@ -0,0 +1,241 @@
    +package org.apache.storm.hive.common;
    +
    +
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hive.hcatalog.streaming.HiveEndPoint;
    +import org.apache.hive.hcatalog.streaming.StreamingException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.*;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import com.google.common.util.concurrent.ThreadFactoryBuilder;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +public class HiveConnector {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(HiveConnector.class);
    +    private HiveOptions options;
    +    private transient Timer heartBeatTimer;
    +    private AtomicBoolean sendHeartBeat = new AtomicBoolean(true);
    +    private UserGroupInformation ugi = null;
    +    private Map<HiveEndPoint, HiveWriter> allWriters;
    +    private ExecutorService callTimeoutPool;
    +
    +    public HiveConnector(HiveOptions options) {
    +        this.options = options;
    +
    +    }
    +
    +    public void configure() {
    +        boolean kerberosEnabled;
    +
    +        if(this.options.getKerberosPrincipal() == null && 
this.options.getKerberosKeytab() == null) {
    +            kerberosEnabled = false;
    +        } else if(options.getKerberosPrincipal() != null && 
options.getKerberosKeytab() != null) {
    +            kerberosEnabled = true;
    +        } else {
    +            throw new IllegalArgumentException("To enable Kerberos, need 
to set both KerberosPrincipal " +
    +                                               " & KerberosKeytab");
    +        }
    +
    +        if (kerberosEnabled) {
    +            try {
    +                ugi = HiveUtils.authenticate(options.getKerberosKeytab(), 
options.getKerberosPrincipal());
    +            } catch(HiveUtils.AuthenticationFailed ex) {
    +                LOG.error("Hive Kerberos authentication failed " + 
ex.getMessage(), ex);
    +                throw new IllegalArgumentException(ex);
    +            }
    +        }
    +
    +        allWriters = new ConcurrentHashMap<HiveEndPoint, HiveWriter>();
    +        String timeoutName = "hive-bolt-%d";
    +        this.callTimeoutPool = Executors.newFixedThreadPool(1,
    +                   new 
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    +        heartBeatTimer = new Timer();
    +        setupHeartBeatTimer();
    +    }
    +
    +
    --- End diff --
    
    Extra sapce


> Hive Connector Fixes
> --------------------
>
>                 Key: STORM-1030
>                 URL: https://issues.apache.org/jira/browse/STORM-1030
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-hive
>            Reporter: Sriharsha Chintalapani
>            Assignee: Sriharsha Chintalapani
>             Fix For: 0.11.0
>
>
> 1. Schedule Hive transaction heartbeats outside of execute method.
> 2. Fix retiring idleWriters
> 3. Do not call flush if there is no data added to a txnbatch
> 4. Catch any exception and abort transaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to