Alex Parrill created STORM-3473:
-----------------------------------

             Summary: Hive can't read records written from HiveBolt
                 Key: STORM-3473
                 URL: https://issues.apache.org/jira/browse/STORM-3473
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-hive
    Affects Versions: 2.0.0
            Reporter: Alex Parrill


I'm trying to stream items from storm into hive using the HiveBolt, but Hive 
does not seem to see the records at all.

Test program:
{code:java}
package com.datto.hivetest;

import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.hive.bolt.HiveBolt;
import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper;
import org.apache.storm.hive.common.HiveOptions;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.streams.StreamBuilder;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Time;

import java.util.Map;
import java.util.Random;

public class MainStorm {
        public static void main(String[] args) throws InvalidTopologyException, 
AuthorizationException, AlreadyAliveException {
                HiveOptions hiveOptions = new HiveOptions(
                        "<url>",
                        "default",
                        "test_table",
                        new JsonRecordHiveMapper()
                                .withColumnFields(new Fields("value"))
                )
                        .withAutoCreatePartitions(true);

                StreamBuilder builder = new StreamBuilder();
                builder.newStream(new TestSpout())
                        .map(tup -> tup.getStringByField("word").toLowerCase())
                        .to(new HiveBolt(hiveOptions));

                Config config = new Config();
                config.setMessageTimeoutSecs(30);
                config.setMaxSpoutPending(1024);
                config.setClasspath("/etc/hadoop/conf/");

                StormSubmitter.submitTopology("hive-test", config, 
builder.build());
        }

        public static class TestSpout extends BaseRichSpout {
                private transient SpoutOutputCollector out;
                private transient Random random;

                @Override
                public void open(Map<String, Object> conf, TopologyContext 
context, SpoutOutputCollector collector) {
                        out = collector;
                        random = new Random();
                }

                @Override
                public void nextTuple() {
                        try {
                                Time.sleep(100);
                        } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                throw new RuntimeException(e);
                        }

                        final String[] words = new String[]{ "nathan", "mike", 
"jackson", "golda", "bertels" };
                        final String word = words[random.nextInt(words.length)];
                        out.emit(new Values(word));
                }

                @Override
                public void declareOutputFields(OutputFieldsDeclarer declarer) {
                        declarer.declare(new Fields("word"));
                }
        }
}
{code}
Table creation:
{code:sql}
CREATE TABLE test_table (value string) CLUSTERED BY (value) INTO 4 BUCKETS 
STORED AS ORC TBLPROPERTIES('orc.compress' = 'ZLIB', 'transactional' = 'true');

GRANT ALL ON test_table TO USER storm;{code}

Setting the ACL:

{code}
sudo -u hdfs hdfs dfs -setfacl -m user:storm:rwx 
/warehouse/tablespace/managed/hive/test_table
sudo -u hdfs hdfs dfs -setfacl -m default:user:storm:rwx 
/warehouse/tablespace/managed/hive/test_table
{code}

Hive results after running for around 10 minutes:

{code:java}
> SELECT COUNT(*) FROM test_table;
INFO  : Compiling 
command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403): 
SELECT COUNT(*) FROM test_table
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:_c0, 
type:bigint, comment:null)], properties:null)
INFO  : Completed compiling 
command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403); Time 
taken: 1.138 seconds
INFO  : Executing 
command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403): 
SELECT COUNT(*) FROM test_table
INFO  : Completed executing 
command(queryId=hive_20190722195152_2315b4c9-f527-4b6e-8652-151d9c4f6403); Time 
taken: 0.013 seconds
INFO  : OK
+------+
| _c0  |
+------+
| 0    |
+------+
{code}

So hive thinks there are no results, which isn't good. But if I look at hdfs, 
there are some files there:

{code}
# sudo -u hdfs hdfs dfs -ls -R -h /warehouse/tablespace/managed/hive/test_table
drwxrwx---+  - storm hadoop          0 2019-07-22 19:15 
/warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100
-rw-rw----+  3 storm hadoop          1 2019-07-22 19:15 
/warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/_orc_acid_version
-rw-rw----+  3 storm hadoop     74.4 K 2019-07-22 19:27 
/warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00001
-rw-rw----+  3 storm hadoop        376 2019-07-22 19:27 
/warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00001_flush_length
-rw-rw----+  3 storm hadoop     73.4 K 2019-07-22 19:27 
/warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00002
-rw-rw----+  3 storm hadoop        376 2019-07-22 19:27 
/warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00002_flush_length
-rw-rw----+  3 storm hadoop     84.9 K 2019-07-22 19:27 
/warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00003
-rw-rw----+  3 storm hadoop        376 2019-07-22 19:27 
/warehouse/tablespace/managed/hive/test_table/delta_0000001_0000100/bucket_00003_flush_length
{code}

And they seem to have valid rows:

{code}
❯❯❯ ./orc-contents /tmp/bucket_00002  | head
{"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 0, 
"currentTransaction": 1, "row": {"value": "bertels"}}
{"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 1, 
"currentTransaction": 1, "row": {"value": "bertels"}}
{"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 2, 
"currentTransaction": 1, "row": {"value": "bertels"}}
{"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 3, 
"currentTransaction": 1, "row": {"value": "bertels"}}
{"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 4, 
"currentTransaction": 1, "row": {"value": "bertels"}}
{"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 5, 
"currentTransaction": 1, "row": {"value": "bertels"}}
{"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 6, 
"currentTransaction": 1, "row": {"value": "bertels"}}
{"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 7, 
"currentTransaction": 1, "row": {"value": "bertels"}}
{"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 8, 
"currentTransaction": 1, "row": {"value": "bertels"}}
{"operation": 0, "originalTransaction": 1, "bucket": 537001984, "rowId": 9, 
"currentTransaction": 1, "row": {"value": "bertels"}}
{code}

I can insert into the table manually, and I've also written a test java program 
that uses the hive streaming API to write one row, and hive sees those inserts. 
I don't see any errors in the storm logs; the tuples seem to be flushed and 
acked ok. I don't think I've seen any errors in the metastore logs either.

Anyone know what's up? I can get more info if needed.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to