[ 
https://issues.apache.org/jira/browse/HIVE-15658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15832465#comment-15832465
 ] 

Sergey Shelukhin commented on HIVE-15658:
-----------------------------------------

In the last catch of the SessionState.java part of the patch, can you add a 
call to tezSessionState.close, if present, protected by try-catch? 
Doesn't look like it should fail leaving it in open state, but I wonder if open 
(and esp. beginOpen) calls are atomic... we'd rather not have that leak. 

Also why is HiveSessionImpl change necessary? As far as I understand, the 
session at that point is already valid. At least, it's retained in the field at 
the same time as we detach it.

> hive.ql.session.SessionState start() is not atomic, SessionState thread local 
> variable can get into inconsistent state
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-15658
>                 URL: https://issues.apache.org/jira/browse/HIVE-15658
>             Project: Hive
>          Issue Type: Bug
>          Components: API, HCatalog
>    Affects Versions: 1.1.0, 1.2.1, 2.0.0, 2.0.1
>         Environment: CDH5.8.0, Flume 1.6.0, Hive 1.1.0
>            Reporter: Michal Klempa
>         Attachments: HIVE-15658_branch-1.2_1.patch, 
> HIVE-15658_branch-2.1_1.patch
>
>
> Method start() in hive.ql.session.SessionState is supposed to setup needed 
> preconditions, like HDFS scratch directories for session.
> This happens to be not an atomic operation with setting thread local 
> variable, which can later be obtained by calling SessionState.get().
> Therefore, even is the start() method itself fails, the SessionState.get() 
> does not return null and further re-use of the thread which previously 
> invoked start() may lead to obtaining SessionState object in inconsistent 
> state.
> I have observed this using Flume Hive Sink, which uses Hive Streaming 
> interface. When the directory /tmp/hive is not writable by session user, the 
> start() method fails (throwing RuntimeException). If the thread is re-used 
> (like it is in Flume), further executions work with wrongly initialized 
> SessionState object (HDFS dirs are non-existent). In Flume, this happens to 
> me when Flume should create partition if not exists (but the code doing this 
> is in Hive Streaming).
> Steps to reproduce:
> 0. create test spooldir and allow flume to write to it, in my case 
> /home/ubuntu/flume_test, 775, ubuntu:flume
> 1. create Flume config (see attachment)
> 2. create Hive table
> {code}
> create table default.flume_test (column1 string, column2 string) partitioned 
> by (dt string) clustered by (column1) INTO 2 BUCKETS STORED AS ORC;
> {code}
> 3. start flume agent:
> {code}
> bin/flume-ng agent -n a1 -c conf -f conf/flume-config.txt
> {code}
> 4. hdfs dfs -chmod 600 /tmp/hive
> 5. put this file into spooldir:
> {code}
> echo value1,value2 > file1
> {code}
> Expected behavior:
> Exception regarding scratch dir permissions to be thrown repeatedly.
> example (note that the line numbers are wrong as Cloudera is cloning the 
> source codes here https://github.com/cloudera/flume-ng/ and here 
> https://github.com/cloudera/hive):
> {code}
> 2017-01-18 12:39:38,926 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 
> : Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', 
> database='default', table='flume_test', partitionVals=[20170118] }
> org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to 
> EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', 
> table='flume_test', partitionVals=[20170118] } 
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
>         at 
> org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
>         at 
> org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
>         at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed 
> connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', 
> database='default', table='flume_test', partitionVals=[20170118] }
>         at 
> org.apache.flume.sink.hive.HiveWriter.newConnection(HiveWriter.java:380)
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:86)
>         ... 6 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: The root 
> scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: 
> rw-------
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:540)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:358)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
>         at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         ... 1 more
> Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on 
> HDFS should be writable. Current permissions are: rw-------
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:625)
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:574)
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:518)
>         ... 13 more
> {code}
> Actual behavior:
> Exception regarding scratch dir permissions thrown once, meaningless 
> exceptions from code, which should be unreachable, are re-thrown again and 
> again, obfuscating the
> source of the problem to the user.
> exceptions thrown repeatedly:
> {code}
> java.lang.NullPointerException: Non-local session path expected to be non-null
>         at 
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:686)
>         at org.apache.hadoop.hive.ql.Context.<init>(Context.java:131)
>         at org.apache.hadoop.hive.ql.Context.<init>(Context.java:118)
>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:411)
>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:312)
>         at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1201)
>         at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1296)
>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1127)
>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1115)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:369)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
>         at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> {code}
> 2017-01-18 12:39:44,453 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 
> : Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', 
> database='default', table='flume_test', partitionVals=[20170118] }
> org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to 
> EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', 
> table='flume_test', partitionVals=[20170118] }
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
>         at 
> org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
>         at 
> org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
>         at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hive.hcatalog.streaming.StreamingException: partition 
> values=[20170118]. Unable to get path for end point: [20170118]
>         at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:162)
>         at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:66)
>         at 
> org.apache.hive.hcatalog.streaming.DelimitedInputWriter.<init>(DelimitedInputWriter.java:115)
>         at 
> org.apache.flume.sink.hive.HiveDelimitedTextSerializer.createRecordWriter(HiveDelimitedTextSerializer.java:67)
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
>         ... 6 more
> Caused by: NoSuchObjectException(message:partition values=[20170118])
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60283)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60251)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result.read(ThriftHiveMetastore.java:60182)
>         at 
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition(ThriftHiveMetastore.java:1892)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition(ThriftHiveMetastore.java:1877)
>         at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getPartition(HiveMetaStoreClient.java:1171)
>         at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:157)
>         ... 10 more
> {code}
> Detailed description on whats going on:
> Flume, as the Hive Streaming client, does the streaming in the HiveSink 
> class, main part is done on line
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L253
> where one "Batch" is drained (batch in sense of flume batch of incoming 
> messages from channel).
> Main for loop for batch drain is:
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L282
> Flume creates hive endpoint for each line it tries to insert into Hive 
> (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L290),
>  not very effective, but, the .equals in HiveEndPoint is properly written, so 
> everything works.
> Then, it creates the helper HiveWriter 
> (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L295),
>  which
> is cached - one for each HiveEndPoint, if no HiveWriter for endpoint exists, 
> it is created on line
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L343
> Inspecting the constructor of HiveWriter, brings us to creating new 
> connection to Hive using the Streaming API:
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L86
> The connection is created in a separate thread:
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L376
> as the submitted Future 
> (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L425)
> into the thread pool callTimeoutPool (the pool comes from HiveWriter 
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java#L493
>  and is of constant size 1, which seems like Flume is using 1 thread per Hive 
> Sink to talk with Hive.
> When creating newConnection 
> (https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L379),
> with the request of autoCreatePartitions=true, the HiveEndPoint, the entry 
> point to Hive Streaming is called : 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L105
> As I was testing non-authenticated, it boils to 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L192
> and finally to 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L215
> Constructor for inner private class ConnectionImpl then tries to create 
> partition if it not exists, on the line 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L318
> And the trouble starts in method createPartitionIfNotExists on line
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L455
> as the SessionState.get() returns null - we did not started the session yet, 
> we try to create a new one.
> In SessionState.start() first thing done is registering the object itself as 
> the threadlocal variable:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L526
> Thereafter, the directories (scratchdir and subdirs) are tried to be created:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L548
> but if this fails, the RuntimeException (from 
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L619
>  and 
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L677)
>  is not caught in the catch blocks (nor there is any finally block).
> So basically, SessionState.start() has failed with proper initialization 
> (e.g. HDFS dirs are not created, nor is the SessionState.hdfsSessionPath set 
> to non-null) and yet the execution continues.
> With RuntimeException thrown from .start() method, the caller (HiveEndPoint) 
> propagates the exception back to the HiveWriter 
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L379
> The exception is caught 
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L442
>  but handled only as do logging and go on: 
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L456
> This is the moment this exception is logged:
> {code}
> Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on 
> HDFS should be writable. Current permissions are: rw-------
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:625)
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:574)
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:518)
>         ... 13 more
> {code}
> What happens next? Flume re-runs the delivery, calling HiveSink.process, 
> boiling into newConnection again. But Flume uses the SAME and exact one 
> thread it used before to do this.
> This time, the if clause: 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L454
> returns true, as the SessionState.get() return the incorrectly initialized 
> SessionState from previous attempt.
> Then, it goes into 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L466
>  and down to the 
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L738
>  which fails on null value of hdfsSessionPath in SessionState.
> But this RuntimeException (NullPointerException) is not caught by 
> https://github.com/apache/hive/blob/master/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java#L470
>  and so it is logged:
> {code}
> 2017-01-18 12:39:44,194 ERROR org.apache.hadoop.hive.ql.Driver: FAILED: 
> NullPointerException Non-local session path expected to be non-null
> java.lang.NullPointerException: Non-local session path expected to be non-null
>         at 
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>         at 
> org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:686)
>         at org.apache.hadoop.hive.ql.Context.<init>(Context.java:131)
>         at org.apache.hadoop.hive.ql.Context.<init>(Context.java:118)
>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:411)
>         at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:312)
>         at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1201)
>         at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1296)
>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1127)
>         at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1115)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.runDDL(HiveEndPoint.java:404)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.createPartitionIfNotExists(HiveEndPoint.java:369)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:276)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.<init>(HiveEndPoint.java:243)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnectionImpl(HiveEndPoint.java:180)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:157)
>         at 
> org.apache.hive.hcatalog.streaming.HiveEndPoint.newConnection(HiveEndPoint.java:110)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:376)
>         at org.apache.flume.sink.hive.HiveWriter$8.call(HiveWriter.java:373)
>         at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> Sometimes, Flume manages to run through the 
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L86
>  as the newConnection is created in separate thread, the Flume rushes into 
> https://github.com/apache/flume/blob/release-1.7.0/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java#L89
>  creating another meaningless exception:
> {code}
> 2017-01-18 12:39:44,453 WARN org.apache.flume.sink.hive.HiveSink: sink_hive_1 
> : Failed connecting to EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', 
> database='default', table='flume_test', partitionVals=[20170118] }
> org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to 
> EndPoint {metaStoreUri='thrift://n02.cdh.ideata:9083', database='default', 
> table='flume_test', partitionVals=[20170118] }
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
>         at 
> org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:344)
>         at 
> org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
>         at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
>         at 
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.hive.hcatalog.streaming.StreamingException: partition 
> values=[20170118]. Unable to get path for end point: [20170118]
>         at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:162)
>         at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.<init>(AbstractRecordWriter.java:66)
>         at 
> org.apache.hive.hcatalog.streaming.DelimitedInputWriter.<init>(DelimitedInputWriter.java:115)
>         at 
> org.apache.flume.sink.hive.HiveDelimitedTextSerializer.createRecordWriter(HiveDelimitedTextSerializer.java:67)
>         at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:89)
>         ... 6 more
> Caused by: NoSuchObjectException(message:partition values=[20170118])
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60283)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result$get_partition_resultStandardScheme.read(ThriftHiveMetastore.java:60251)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_partition_result.read(ThriftHiveMetastore.java:60182)
>         at 
> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partition(ThriftHiveMetastore.java:1892)
>         at 
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partition(ThriftHiveMetastore.java:1877)
>         at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getPartition(HiveMetaStoreClient.java:1171)
>         at 
> org.apache.hive.hcatalog.streaming.AbstractRecordWriter.getPathForEndPoint(AbstractRecordWriter.java:157)
>         ... 10 more
> {code}
> Proposing solution:
> If Hive Streaming API is allowed to be used with same thread again (which 
> probably is), then the threadlocal set in 
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L526
> has to be unset in case of any exception in proceeding blocks:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L539
> so set the thread local back to null before rethrowing exceptions here:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L568
> and here:
> https://github.com/apache/hive/blob/release-2.0.1/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java#L602
> Links to source codes are from latest version, although I have been doing 
> testing on Hive 1.1.0. From code, it seems like
> bug has to be present also in recent versions.
> If Hive Streaming API is not allowed to be called by reusing threads, then 
> not only Flume, but probably also NiFi client 
> (https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java#L237)
>  has to be fixed (well, NiFi just copy&pasted the Flume codebase, is there 
> any other copy of this HiveWriter out there?).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to