Repository: hive
Updated Branches:
  refs/heads/master 9c907769a -> ce36c439c


HIVE-20349 : Implement Retry Logic in HiveDruidSplit for Scan Queries (Nishant 
Bangarwa via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ce36c439
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ce36c439
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ce36c439

Branch: refs/heads/master
Commit: ce36c439cd00bf516f6293750f8574f1d518cbe3
Parents: 9c90776
Author: Nishant Bangarwa <[email protected]>
Authored: Fri Aug 17 04:55:00 2018 -0700
Committer: Ashutosh Chauhan <[email protected]>
Committed: Wed Sep 19 07:37:03 2018 -0700

----------------------------------------------------------------------
 .../druid/io/DruidQueryBasedInputFormat.java    |   7 +-
 .../druid/serde/DruidQueryRecordReader.java     | 105 +++++++++++--------
 ...uidQueryBasedInputFormatToAddFaultyHost.java |  53 ++++++++++
 ...QTestDruidStorageHandlerToAddFaultyHost.java |  34 ++++++
 .../clientpositive/druidmini_test_alter.q       |   2 +-
 .../druid/druidmini_test_alter.q.out            |   4 +-
 6 files changed, 157 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ce36c439/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
index 0125c24..f5009a2 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java
@@ -106,7 +106,7 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
   }
 
   @SuppressWarnings("deprecation")
-  private HiveDruidSplit[] getInputSplits(Configuration conf) throws 
IOException {
+  protected HiveDruidSplit[] getInputSplits(Configuration conf) throws 
IOException {
     String address = HiveConf.getVar(conf,
             HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS
     );
@@ -223,10 +223,13 @@ public class DruidQueryBasedInputFormat extends 
InputFormat<NullWritable, DruidW
     final HiveDruidSplit[] splits = new 
HiveDruidSplit[segmentDescriptors.size()];
     for (int i = 0; i < numSplits; i++) {
       final LocatedSegmentDescriptor locatedSD = segmentDescriptors.get(i);
-      final String[] hosts = new String[locatedSD.getLocations().size()];
+      final String[] hosts = new String[locatedSD.getLocations().size() + 1];
       for (int j = 0; j < locatedSD.getLocations().size(); j++) {
         hosts[j] = locatedSD.getLocations().get(j).getHost();
       }
+      // Default to broker if all other hosts fail.
+      hosts[locatedSD.getLocations().size()] = address;
+
       // Create partial Select query
       final SegmentDescriptor newSD = new SegmentDescriptor(
           locatedSD.getInterval(), locatedSD.getVersion(), 
locatedSD.getPartitionNumber());

http://git-wip-us.apache.org/repos/asf/hive/blob/ce36c439/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
index 6b6fa3d..8c10261 100644
--- 
a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
+++ 
b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java
@@ -105,18 +105,53 @@ public abstract class DruidQueryRecordReader<T extends 
BaseQuery<R>, R extends C
     Preconditions.checkNotNull(query);
     this.resultsType = getResultTypeDef();
     this.httpClient = Preconditions.checkNotNull(httpClient, "need Http 
Client");
-    // Execute query
-    LOG.debug("Retrieving data from druid using query:\n " + query);
-    final String address = hiveDruidSplit.getLocations()[0];
-    if (Strings.isNullOrEmpty(address)) {
-      throw new IOException("can not fetch results form empty or null host 
value");
+    final String[] locations = hiveDruidSplit.getLocations();
+    boolean initlialized = false;
+    int currentLocationIndex = 0;
+    Exception ex = null;
+    while (!initlialized && currentLocationIndex < locations.length) {
+      String address = locations[currentLocationIndex++];
+      if(Strings.isNullOrEmpty(address)) {
+        throw new IOException("can not fetch results from empty or null host 
value");
+      }
+      // Execute query
+      LOG.debug("Retrieving data from druid location[{}] using query:[{}] ", 
address, query);
+      try {
+        Request request = DruidStorageHandlerUtils.createSmileRequest(address, 
query);
+        Future<InputStream> inputStreamFuture = this.httpClient
+                .go(request, new InputStreamResponseHandler());
+        queryResultsIterator = new JsonParserIterator(this.smileMapper, 
resultsType,
+                inputStreamFuture, request.getUrl().toString(), query
+        );
+        queryResultsIterator.init();
+        initlialized = true;
+      } catch (IOException | ExecutionException | InterruptedException e) {
+        if(queryResultsIterator != null) {
+          // We got exception while querying results from this host.
+          queryResultsIterator.close();
+        }
+        LOG.error("Failure getting results for query[{}] from host[{}] because 
of [{}]",
+                query,
+                address,
+                e.getMessage()
+        );
+        if(ex == null) {
+          ex = e;
+        } else {
+          ex.addSuppressed(e);
+        }
+      }
+    }
+
+    if(!initlialized) {
+      throw new RE(
+              ex,
+              "Failure getting results for query[%s] from locations[%s] 
because of [%s]",
+              query,
+              locations,
+              ex.getMessage()
+      );
     }
-    Request request = DruidStorageHandlerUtils.createSmileRequest(address, 
query);
-    Future<InputStream> inputStreamFuture = this.httpClient
-            .go(request, new InputStreamResponseHandler());
-    queryResultsIterator = new JsonParserIterator(this.smileMapper, 
resultsType, inputStreamFuture,
-            request.getUrl().toString(), query
-    );
   }
 
   public void initialize(InputSplit split, Configuration conf) throws 
IOException {
@@ -207,8 +242,6 @@ public abstract class DruidQueryRecordReader<T extends 
BaseQuery<R>, R extends C
     @Override
     public boolean hasNext()
     {
-      init();
-
       if (jp.isClosed()) {
         return false;
       }
@@ -223,8 +256,6 @@ public abstract class DruidQueryRecordReader<T extends 
BaseQuery<R>, R extends C
     @Override
     public R next()
     {
-      init();
-
       try {
         final R retVal = objectCodec.readValue(jp, typeRef);
         jp.nextToken();
@@ -241,35 +272,23 @@ public abstract class DruidQueryRecordReader<T extends 
BaseQuery<R>, R extends C
       throw new UnsupportedOperationException();
     }
 
-    private void init()
-    {
-      if (jp == null) {
-        try {
-          InputStream is = future.get();
-          if (is == null) {
-            throw  new IOException(String.format("query[%s] url[%s] timed 
out", query, url));
-          } else {
-            jp = 
mapper.getFactory().createParser(is).configure(JsonParser.Feature.AUTO_CLOSE_SOURCE,
 true);
-          }
-          final JsonToken nextToken = jp.nextToken();
-          if (nextToken == JsonToken.START_OBJECT) {
-            QueryInterruptedException cause = jp.getCodec().readValue(jp, 
QueryInterruptedException.class);
-            throw new QueryInterruptedException(cause);
-          } else if (nextToken != JsonToken.START_ARRAY) {
-            throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url 
[%s]", jp.getCurrentToken(), url);
-          } else {
-            jp.nextToken();
-            objectCodec = jp.getCodec();
-          }
+    private void init() throws IOException, ExecutionException, 
InterruptedException {
+      if(jp == null) {
+        InputStream is = future.get();
+        if(is == null) {
+          throw new IOException(String.format("query[%s] url[%s] timed out", 
query, url));
+        } else {
+          jp = 
mapper.getFactory().createParser(is).configure(JsonParser.Feature.AUTO_CLOSE_SOURCE,
 true);
         }
-        catch (IOException | InterruptedException | ExecutionException e) {
-          throw new RE(
-                  e,
-                  "Failure getting results for query[%s] url[%s] because of 
[%s]",
-                  query,
-                  url,
-                  e.getMessage()
-          );
+        final JsonToken nextToken = jp.nextToken();
+        if(nextToken == JsonToken.START_OBJECT) {
+          QueryInterruptedException cause = jp.getCodec().readValue(jp, 
QueryInterruptedException.class);
+          throw new QueryInterruptedException(cause);
+        } else if(nextToken != JsonToken.START_ARRAY) {
+          throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url 
[%s]", jp.getCurrentToken(), url);
+        } else {
+          jp.nextToken();
+          objectCodec = jp.getCodec();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/ce36c439/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidQueryBasedInputFormatToAddFaultyHost.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidQueryBasedInputFormatToAddFaultyHost.java
 
b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidQueryBasedInputFormatToAddFaultyHost.java
new file mode 100644
index 0000000..b47982f
--- /dev/null
+++ 
b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidQueryBasedInputFormatToAddFaultyHost.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
+import org.apache.hadoop.hive.druid.io.HiveDruidSplit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This input format adds a faulty host as first input split location.
+ * Tests should be able to query results successfully by trying next split 
locations.
+ */
+public class QTestDruidQueryBasedInputFormatToAddFaultyHost extends 
DruidQueryBasedInputFormat {
+
+  @Override
+  protected HiveDruidSplit[] getInputSplits(Configuration conf) throws 
IOException {
+    HiveDruidSplit[] inputSplits = super.getInputSplits(conf);
+    List<HiveDruidSplit> list = new ArrayList<>();
+    for(HiveDruidSplit split : inputSplits) {
+      String[] locations = split.getLocations();
+      List<String> locationsWithFaultyHost = 
Lists.newArrayListWithCapacity(locations.length + 1);
+      // A non-queryable host location.
+      locationsWithFaultyHost.add("localhost:8081");
+      locationsWithFaultyHost.addAll(Arrays.asList(locations));
+      HiveDruidSplit hiveDruidSplit = new 
HiveDruidSplit(split.getDruidQuery(), split.getPath(),
+              locationsWithFaultyHost.toArray(new String[0])
+      );
+      list.add(hiveDruidSplit);
+    }
+    return list.toArray(new HiveDruidSplit[0]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ce36c439/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandlerToAddFaultyHost.java
----------------------------------------------------------------------
diff --git 
a/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandlerToAddFaultyHost.java
 
b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandlerToAddFaultyHost.java
new file mode 100644
index 0000000..e0dec01
--- /dev/null
+++ 
b/druid-handler/src/test/org/apache/hadoop/hive/druid/QTestDruidStorageHandlerToAddFaultyHost.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.druid;
+
+import org.apache.hadoop.mapred.InputFormat;
+
+/**
+ * Storage handler for Druid to be used in tests.
+ * It uses an input format that adds a faulty host as first input split 
location.
+ * Tests should be able to query results successfully by trying next split 
locations.
+ */
+@SuppressWarnings("deprecation")
+public class QTestDruidStorageHandlerToAddFaultyHost extends 
DruidStorageHandler {
+
+  @Override
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return QTestDruidQueryBasedInputFormatToAddFaultyHost.class;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/ce36c439/ql/src/test/queries/clientpositive/druidmini_test_alter.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/druidmini_test_alter.q 
b/ql/src/test/queries/clientpositive/druidmini_test_alter.q
index 0d8623d..5f8f264 100644
--- a/ql/src/test/queries/clientpositive/druidmini_test_alter.q
+++ b/ql/src/test/queries/clientpositive/druidmini_test_alter.q
@@ -2,7 +2,7 @@
 SET hive.ctas.external.tables=true;
 SET hive.external.table.purge.default = true;
 CREATE EXTERNAL TABLE druid_alltypesorc_n0
-STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+STORED BY 
'org.apache.hadoop.hive.druid.QTestDruidStorageHandlerToAddFaultyHost'
 TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" 
= "MINUTE")
 AS
   SELECT cast (`ctimestamp2` as timestamp with local time zone) as `__time`,

http://git-wip-us.apache.org/repos/asf/hive/blob/ce36c439/ql/src/test/results/clientpositive/druid/druidmini_test_alter.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/druid/druidmini_test_alter.q.out 
b/ql/src/test/results/clientpositive/druid/druidmini_test_alter.q.out
index a175f76..0627cfd 100644
--- a/ql/src/test/results/clientpositive/druid/druidmini_test_alter.q.out
+++ b/ql/src/test/results/clientpositive/druid/druidmini_test_alter.q.out
@@ -1,5 +1,5 @@
 PREHOOK: query: CREATE EXTERNAL TABLE druid_alltypesorc_n0
-STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+STORED BY 
'org.apache.hadoop.hive.druid.QTestDruidStorageHandlerToAddFaultyHost'
 TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" 
= "MINUTE")
 AS
   SELECT cast (`ctimestamp2` as timestamp with local time zone) as `__time`,
@@ -17,7 +17,7 @@ PREHOOK: Input: default@alltypesorc
 PREHOOK: Output: database:default
 PREHOOK: Output: default@druid_alltypesorc_n0
 POSTHOOK: query: CREATE EXTERNAL TABLE druid_alltypesorc_n0
-STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
+STORED BY 
'org.apache.hadoop.hive.druid.QTestDruidStorageHandlerToAddFaultyHost'
 TBLPROPERTIES ("druid.segment.granularity" = "HOUR", "druid.query.granularity" 
= "MINUTE")
 AS
   SELECT cast (`ctimestamp2` as timestamp with local time zone) as `__time`,

Reply via email to