[ 
https://issues.apache.org/jira/browse/STORM-1030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089377#comment-15089377
 ] 

ASF GitHub Bot commented on STORM-1030:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/871#discussion_r49201617
  
    --- Diff: 
external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java ---
    @@ -25,75 +26,40 @@
     import backtype.storm.topology.OutputFieldsDeclarer;
     import backtype.storm.utils.TupleUtils;
     import backtype.storm.Config;
    +import org.apache.storm.hive.common.HiveConnector;
     import org.apache.storm.hive.common.HiveWriter;
    -import com.google.common.util.concurrent.ThreadFactoryBuilder;
     import org.apache.hive.hcatalog.streaming.*;
     import org.apache.storm.hive.common.HiveOptions;
     import org.apache.storm.hive.common.HiveUtils;
    -import org.apache.hadoop.security.UserGroupInformation;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.List;
    -import java.util.ArrayList;
     import java.util.Map;
    -import java.util.HashMap;
    -import java.util.Timer;
    -import java.util.TimerTask;
    -import java.util.Map.Entry;
    -import java.util.concurrent.ExecutorService;
    -import java.util.concurrent.Executors;
    -import java.util.concurrent.TimeUnit;
    -import java.util.concurrent.atomic.AtomicBoolean;
    -import java.util.List;
     import java.util.LinkedList;
    -import java.io.IOException;
     
     public class HiveBolt extends  BaseRichBolt {
         private static final Logger LOG = 
LoggerFactory.getLogger(HiveBolt.class);
         private OutputCollector collector;
         private HiveOptions options;
    -    private ExecutorService callTimeoutPool;
    -    private transient Timer heartBeatTimer;
    -    private Boolean kerberosEnabled = false;
    -    private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
    -    private UserGroupInformation ugi = null;
    -    HashMap<HiveEndPoint, HiveWriter> allWriters;
    +    private HiveConnector hiveConnector;
         private List<Tuple> tupleBatch;
    +    transient CountMetric serializationErrorMetric;
     
         public HiveBolt(HiveOptions options) {
             this.options = options;
             tupleBatch = new LinkedList<>();
    +
         }
     
         @Override
         public void prepare(Map conf, TopologyContext topologyContext, 
OutputCollector collector)  {
             try {
    -            if(options.getKerberosPrincipal() == null && 
options.getKerberosKeytab() == null) {
    -                kerberosEnabled = false;
    -            } else if(options.getKerberosPrincipal() != null && 
options.getKerberosKeytab() != null) {
    -                kerberosEnabled = true;
    -            } else {
    -                throw new IllegalArgumentException("To enable Kerberos, 
need to set both KerberosPrincipal " +
    -                                                   " & KerberosKeytab");
    -            }
    -
    -            if (kerberosEnabled) {
    -                try {
    -                    ugi = 
HiveUtils.authenticate(options.getKerberosKeytab(), 
options.getKerberosPrincipal());
    -                } catch(HiveUtils.AuthenticationFailed ex) {
    -                    LOG.error("Hive Kerberos authentication failed " + 
ex.getMessage(), ex);
    -                    throw new IllegalArgumentException(ex);
    -                }
    -            }
                 this.collector = collector;
    -            allWriters = new HashMap<HiveEndPoint,HiveWriter>();
    -            String timeoutName = "hive-bolt-%d";
    -            this.callTimeoutPool = Executors.newFixedThreadPool(1,
    -                                new 
ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    -            heartBeatTimer = new Timer();
    -            setupHeartBeatTimer();
    -
    +            this.hiveConnector  =  new HiveConnector(this.options);
    +            this.hiveConnector.configure();
    +            this.serializationErrorMetric = new CountMetric();
    +            
topologyContext.registerMetric("hive_serialization_error_count", 
serializationErrorMetric, 0);
    --- End diff --
    
    Not really sure what registering a metric that is reported every 0 seconds 
does.


> Hive Connector Fixes
> --------------------
>
>                 Key: STORM-1030
>                 URL: https://issues.apache.org/jira/browse/STORM-1030
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-hive
>            Reporter: Sriharsha Chintalapani
>            Assignee: Sriharsha Chintalapani
>             Fix For: 0.11.0
>
>
> 1. Schedule Hive transaction heartbeats outside of execute method.
> 2. Fix retiring idleWriters
> 3. Do not call flush if there is no data added to a txnbatch
> 4. Catch any exception and abort transaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to