Repository: hive
Updated Branches:
  refs/heads/master 86b678f50 -> 1eea5a80d


HIVE-19175 : Assert that Insert into Druid Table fails if the publishing of 
metadata by HS2 fails (Slim Bouguerra via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <hashut...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1eea5a80
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1eea5a80
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1eea5a80

Branch: refs/heads/master
Commit: 1eea5a80ded2df33d57b2296b3bed98cb18383fd
Parents: 86b678f
Author: Slim Bouguerra <slim.bougue...@gmail.com>
Authored: Tue Apr 10 17:03:00 2018 -0700
Committer: Ashutosh Chauhan <hashut...@apache.org>
Committed: Sat Apr 14 10:07:23 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/druid/DruidStorageHandler.java  |  69 +++++++-----
 .../hive/druid/DruidStorageHandlerUtils.java    |   9 +-
 .../hadoop/hive/druid/io/DruidRecordWriter.java |   2 +-
 .../druid/DruidStorageHandlerUtilsTest.java     |  32 ++++++
 .../clientpositive/druidmini_test_insert.q      |  27 ++++-
 .../druid/druidmini_test_insert.q.out           | 109 +++++++++++++++++++
 6 files changed, 208 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1eea5a80/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
index c0feb8d..33387b2 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java
@@ -26,7 +26,6 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.metamx.common.RetryUtils;
 import com.metamx.common.lifecycle.Lifecycle;
@@ -118,6 +117,8 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hadoop.hive.druid.DruidStorageHandlerUtils.JSON_MAPPER;
+
 /**
  * DruidStorageHandler provides a HiveStorageHandler implementation for Druid.
  */
@@ -329,7 +330,7 @@ public class DruidStorageHandler extends 
DefaultHiveMetaHook implements HiveStor
             null
         ), "UTF-8");
 
-    Map<String, Object> inputParser = DruidStorageHandlerUtils.JSON_MAPPER
+    Map<String, Object> inputParser = JSON_MAPPER
         .convertValue(inputRowParser, Map.class);
     final DataSchema dataSchema = new DataSchema(
         dataSourceName,
@@ -414,7 +415,7 @@ public class DruidStorageHandler extends 
DefaultHiveMetaHook implements HiveStor
 
   private static void updateKafkaIngestionSpec(String overlordAddress, 
KafkaSupervisorSpec spec) {
     try {
-      String task = 
DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(spec);
+      String task = JSON_MAPPER.writeValueAsString(spec);
       console.printInfo("submitting kafka Spec {}", task);
       LOG.info("submitting kafka Supervisor Spec {}", task);
 
@@ -422,7 +423,7 @@ public class DruidStorageHandler extends 
DefaultHiveMetaHook implements HiveStor
               new URL(String.format("http://%s/druid/indexer/v1/supervisor";, 
overlordAddress)))
               .setContent(
                   "application/json",
-                  
DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsBytes(spec)),
+                  JSON_MAPPER.writeValueAsBytes(spec)),
           new StatusResponseHandler(
               Charset.forName("UTF-8"))).get();
       if (response.getStatus().equals(HttpResponseStatus.OK)) {
@@ -504,7 +505,7 @@ public class DruidStorageHandler extends 
DefaultHiveMetaHook implements HiveStor
           input -> input instanceof IOException,
           getMaxRetryCount());
       if (response.getStatus().equals(HttpResponseStatus.OK)) {
-        return DruidStorageHandlerUtils.JSON_MAPPER
+        return JSON_MAPPER
             .readValue(response.getContent(), KafkaSupervisorSpec.class);
         // Druid Returns 400 Bad Request when not found.
       } else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || 
response.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) {
@@ -522,38 +523,47 @@ public class DruidStorageHandler extends 
DefaultHiveMetaHook implements HiveStor
 
 
   protected void loadDruidSegments(Table table, boolean overwrite) throws 
MetaException {
-    // at this point we have Druid segments from reducers but we need to 
atomically
-    // rename and commit to metadata
+
     final String dataSourceName = 
table.getParameters().get(Constants.DRUID_DATA_SOURCE);
-    final List<DataSegment> segmentList = Lists.newArrayList();
-    final Path tableDir = getSegmentDescriptorDir();
-    // Read the created segments metadata from the table staging directory
+    final Path segmentDescriptorDir = getSegmentDescriptorDir();
     try {
-      segmentList.addAll(DruidStorageHandlerUtils.getCreatedSegments(tableDir, 
getConf()));
+      if 
(!segmentDescriptorDir.getFileSystem(getConf()).exists(segmentDescriptorDir)) {
+        LOG.info(
+            "Directory {} does not exist, ignore this if it is create 
statement or inserts of 0 rows,"
+                + " no Druid segments to move, cleaning working directory {}",
+            segmentDescriptorDir.getName(), getStagingWorkingDir().getName()
+        );
+        cleanWorkingDir();
+        return;
+      }
     } catch (IOException e) {
-      LOG.error("Failed to load segments descriptor from directory {}", 
tableDir.toString());
-      Throwables.propagate(e);
+      LOG.error("Failed to load segments descriptor from directory {}", 
segmentDescriptorDir.toString());
       cleanWorkingDir();
+      Throwables.propagate(e);
     }
-    // Moving Druid segments and committing to druid metadata as one 
transaction.
-    final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new 
HdfsDataSegmentPusherConfig();
-    List<DataSegment> publishedDataSegmentList = Lists.newArrayList();
-    final String segmentDirectory =
-            table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != 
null
-                    ? 
table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY)
-                    : HiveConf.getVar(getConf(), 
HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
-    LOG.info(String.format(
-            "Moving [%s] Druid segments from staging directory [%s] to Deep 
storage [%s]",
-            segmentList.size(),
-            getStagingWorkingDir(),
-            segmentDirectory
 
-            ));
-    hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory);
     try {
+      // at this point we have Druid segments from reducers but we need to 
atomically
+      // rename and commit to metadata
+      // Moving Druid segments and committing to druid metadata as one 
transaction.
+      List<DataSegment> segmentList = 
DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptorDir, getConf());
+      final HdfsDataSegmentPusherConfig hdfsSegmentPusherConfig = new 
HdfsDataSegmentPusherConfig();
+      List<DataSegment> publishedDataSegmentList;
+      final String segmentDirectory =
+          table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY) != null
+              ? table.getParameters().get(Constants.DRUID_SEGMENT_DIRECTORY)
+              : HiveConf.getVar(getConf(), 
HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
+      LOG.info(String.format(
+          "Moving [%s] Druid segments from staging directory [%s] to Deep 
storage [%s]",
+          segmentList.size(),
+          getStagingWorkingDir(),
+          segmentDirectory
+
+      ));
+      hdfsSegmentPusherConfig.setStorageDirectory(segmentDirectory);
       DataSegmentPusher dataSegmentPusher = new 
HdfsDataSegmentPusher(hdfsSegmentPusherConfig,
               getConf(),
-              DruidStorageHandlerUtils.JSON_MAPPER
+              JSON_MAPPER
       );
       publishedDataSegmentList = 
DruidStorageHandlerUtils.publishSegmentsAndCommit(
               getConnector(),
@@ -564,7 +574,7 @@ public class DruidStorageHandler extends 
DefaultHiveMetaHook implements HiveStor
               getConf(),
               dataSegmentPusher
       );
-
+      checkLoadStatus(publishedDataSegmentList);
     } catch (CallbackFailedException | IOException e) {
       LOG.error("Failed to move segments from staging directory");
       if (e instanceof CallbackFailedException) {
@@ -574,7 +584,6 @@ public class DruidStorageHandler extends 
DefaultHiveMetaHook implements HiveStor
     } finally {
       cleanWorkingDir();
     }
-      checkLoadStatus(publishedDataSegmentList);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/1eea5a80/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
index 1aef565..808351d 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java
@@ -299,14 +299,7 @@ public final class DruidStorageHandlerUtils {
     ImmutableList.Builder<DataSegment> publishedSegmentsBuilder = 
ImmutableList.builder();
     FileSystem fs = taskDir.getFileSystem(conf);
     FileStatus[] fss;
-    try {
-      fss = fs.listStatus(taskDir);
-    } catch (FileNotFoundException e) {
-      // This is a CREATE TABLE statement or query executed for CTAS/INSERT
-      // did not produce any result. We do not need to do anything, this is
-      // expected behavior.
-      return publishedSegmentsBuilder.build();
-    }
+    fss = fs.listStatus(taskDir);
     for (FileStatus fileStatus : fss) {
       final DataSegment segment = JSON_MAPPER
               .readValue((InputStream) fs.open(fileStatus.getPath()), 
DataSegment.class);

http://git-wip-us.apache.org/repos/asf/hive/blob/1eea5a80/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
index 8ab34a8..d1f0d98 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidRecordWriter.java
@@ -325,7 +325,7 @@ public class DruidRecordWriter implements 
RecordWriter<NullWritable, DruidWritab
 
   @Override
   public void close(Reporter reporter) throws IOException {
-    this.close(true);
+    this.close(false);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/1eea5a80/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java
 
b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java
new file mode 100644
index 0000000..d079e4f
--- /dev/null
+++ 
b/druid-handler/src/test/org/apache/hadoop/hive/druid/DruidStorageHandlerUtilsTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.druid;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class DruidStorageHandlerUtilsTest {
+
+  @Test public void testCreateSelectStarQuery() throws IOException {
+    Assert.assertTrue("this should not be null",
+        
DruidStorageHandlerUtils.createSelectStarQuery("dummy_ds").contains("dummy_ds"));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/1eea5a80/ql/src/test/queries/clientpositive/druidmini_test_insert.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidmini_test_insert.q 
b/ql/src/test/queries/clientpositive/druidmini_test_insert.q
index f4cb666..c14a1b6 100644
--- a/ql/src/test/queries/clientpositive/druidmini_test_insert.q
+++ b/ql/src/test/queries/clientpositive/druidmini_test_insert.q
@@ -53,7 +53,30 @@ SELECT COUNT(*) FROM druid_alltypesorc;
 
 DROP TABLE druid_alltypesorc;
 
-
+ -- Test create then insert
+ 
+ create database druid_test_create_then_insert;
+ use druid_test_create_then_insert;
+ 
+ create table test_table(`timecolumn` timestamp, `userid` string, `num_l` 
float);
+ 
+ insert into test_table values ('2015-01-08 00:00:00', 'i1-start', 4);
+ insert into test_table values ('2015-01-08 23:59:59', 'i1-end', 1);
+ 
+ CREATE TABLE druid_table (`__time` timestamp with local time zone, `userid` 
string, `num_l` float)
+ STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+ TBLPROPERTIES ("druid.segment.granularity" = "DAY");
+ 
+ 
+ INSERT INTO TABLE druid_table
+ select cast(`timecolumn` as timestamp with local time zone) as `__time`, 
`userid`, `num_l` FROM test_table;
+ 
+ select count(*) FROM druid_table;
+ 
+ DROP TABLE  test_table;
+ DROP TABLE druid_table;
+ DROP DATABASE druid_test_create_then_insert;
+ 
 -- Day light saving time test insert into test
 
 create database druid_test_dst;
@@ -116,3 +139,5 @@ EXPLAIN select * from druid_test_table where `__time` = 
cast('2015-03-10 23:59:5
 
 DROP TABLE test_base_table;
 DROP TABLE druid_test_table;
+
+drop   database druid_test_dst;

http://git-wip-us.apache.org/repos/asf/hive/blob/1eea5a80/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out 
b/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out
index 482554b..c471795 100644
--- a/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidmini_test_insert.q.out
@@ -148,6 +148,107 @@ POSTHOOK: query: DROP TABLE druid_alltypesorc
 POSTHOOK: type: DROPTABLE
 POSTHOOK: Input: default@druid_alltypesorc
 POSTHOOK: Output: default@druid_alltypesorc
+PREHOOK: query: -- Test create then insert
+ 
+ create database druid_test_create_then_insert
+PREHOOK: type: CREATEDATABASE
+PREHOOK: Output: database:druid_test_create_then_insert
+POSTHOOK: query: -- Test create then insert
+ 
+ create database druid_test_create_then_insert
+POSTHOOK: type: CREATEDATABASE
+POSTHOOK: Output: database:druid_test_create_then_insert
+PREHOOK: query: use druid_test_create_then_insert
+PREHOOK: type: SWITCHDATABASE
+PREHOOK: Input: database:druid_test_create_then_insert
+POSTHOOK: query: use druid_test_create_then_insert
+POSTHOOK: type: SWITCHDATABASE
+POSTHOOK: Input: database:druid_test_create_then_insert
+PREHOOK: query: create table test_table(`timecolumn` timestamp, `userid` 
string, `num_l` float)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:druid_test_create_then_insert
+PREHOOK: Output: druid_test_create_then_insert@test_table
+POSTHOOK: query: create table test_table(`timecolumn` timestamp, `userid` 
string, `num_l` float)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:druid_test_create_then_insert
+POSTHOOK: Output: druid_test_create_then_insert@test_table
+PREHOOK: query: insert into test_table values ('2015-01-08 00:00:00', 
'i1-start', 4)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: druid_test_create_then_insert@test_table
+POSTHOOK: query: insert into test_table values ('2015-01-08 00:00:00', 
'i1-start', 4)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: druid_test_create_then_insert@test_table
+POSTHOOK: Lineage: test_table.num_l SCRIPT []
+POSTHOOK: Lineage: test_table.timecolumn SCRIPT []
+POSTHOOK: Lineage: test_table.userid SCRIPT []
+PREHOOK: query: insert into test_table values ('2015-01-08 23:59:59', 
'i1-end', 1)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: druid_test_create_then_insert@test_table
+POSTHOOK: query: insert into test_table values ('2015-01-08 23:59:59', 
'i1-end', 1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: druid_test_create_then_insert@test_table
+POSTHOOK: Lineage: test_table.num_l SCRIPT []
+POSTHOOK: Lineage: test_table.timecolumn SCRIPT []
+POSTHOOK: Lineage: test_table.userid SCRIPT []
+PREHOOK: query: CREATE TABLE druid_table (`__time` timestamp with local time 
zone, `userid` string, `num_l` float)
+ STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+ TBLPROPERTIES ("druid.segment.granularity" = "DAY")
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:druid_test_create_then_insert
+PREHOOK: Output: druid_test_create_then_insert@druid_table
+POSTHOOK: query: CREATE TABLE druid_table (`__time` timestamp with local time 
zone, `userid` string, `num_l` float)
+ STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+ TBLPROPERTIES ("druid.segment.granularity" = "DAY")
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:druid_test_create_then_insert
+POSTHOOK: Output: druid_test_create_then_insert@druid_table
+PREHOOK: query: INSERT INTO TABLE druid_table
+ select cast(`timecolumn` as timestamp with local time zone) as `__time`, 
`userid`, `num_l` FROM test_table
+PREHOOK: type: QUERY
+PREHOOK: Input: druid_test_create_then_insert@test_table
+PREHOOK: Output: druid_test_create_then_insert@druid_table
+POSTHOOK: query: INSERT INTO TABLE druid_table
+ select cast(`timecolumn` as timestamp with local time zone) as `__time`, 
`userid`, `num_l` FROM test_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: druid_test_create_then_insert@test_table
+POSTHOOK: Output: druid_test_create_then_insert@druid_table
+PREHOOK: query: select count(*) FROM druid_table
+PREHOOK: type: QUERY
+PREHOOK: Input: druid_test_create_then_insert@druid_table
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select count(*) FROM druid_table
+POSTHOOK: type: QUERY
+POSTHOOK: Input: druid_test_create_then_insert@druid_table
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+2
+PREHOOK: query: DROP TABLE  test_table
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: druid_test_create_then_insert@test_table
+PREHOOK: Output: druid_test_create_then_insert@test_table
+POSTHOOK: query: DROP TABLE  test_table
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: druid_test_create_then_insert@test_table
+POSTHOOK: Output: druid_test_create_then_insert@test_table
+PREHOOK: query: DROP TABLE druid_table
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: druid_test_create_then_insert@druid_table
+PREHOOK: Output: druid_test_create_then_insert@druid_table
+POSTHOOK: query: DROP TABLE druid_table
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: druid_test_create_then_insert@druid_table
+POSTHOOK: Output: druid_test_create_then_insert@druid_table
+PREHOOK: query: DROP DATABASE druid_test_create_then_insert
+PREHOOK: type: DROPDATABASE
+PREHOOK: Input: database:druid_test_create_then_insert
+PREHOOK: Output: database:druid_test_create_then_insert
+POSTHOOK: query: DROP DATABASE druid_test_create_then_insert
+POSTHOOK: type: DROPDATABASE
+POSTHOOK: Input: database:druid_test_create_then_insert
+POSTHOOK: Output: database:druid_test_create_then_insert
 PREHOOK: query: create database druid_test_dst
 PREHOOK: type: CREATEDATABASE
 PREHOOK: Output: database:druid_test_dst
@@ -681,3 +782,11 @@ POSTHOOK: query: DROP TABLE druid_test_table
 POSTHOOK: type: DROPTABLE
 POSTHOOK: Input: druid_test_dst@druid_test_table
 POSTHOOK: Output: druid_test_dst@druid_test_table
+PREHOOK: query: drop   database druid_test_dst
+PREHOOK: type: DROPDATABASE
+PREHOOK: Input: database:druid_test_dst
+PREHOOK: Output: database:druid_test_dst
+POSTHOOK: query: drop   database druid_test_dst
+POSTHOOK: type: DROPDATABASE
+POSTHOOK: Input: database:druid_test_dst
+POSTHOOK: Output: database:druid_test_dst

Reply via email to