Repository: hive Updated Branches: refs/heads/master 05d4e4ebc -> 076f4d2d7
Revert "BUG-108021 / BUG-108287 / HIVE-20383 : Invalid queue name and synchronisation issues in hive proto events hook. (Harish JP, reviewd by Anishek Agarwal)" This reverts commit 05d4e4ebcec1c97e3b4e86e7e1fc0717e5b13d05. Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/076f4d2d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/076f4d2d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/076f4d2d Branch: refs/heads/master Commit: 076f4d2d7f447b25b5e5d9ff74359a5ed8043945 Parents: 05d4e4e Author: Anishek Agarwal <[email protected]> Authored: Thu Aug 16 12:32:01 2018 +0530 Committer: Anishek Agarwal <[email protected]> Committed: Thu Aug 16 12:32:01 2018 +0530 ---------------------------------------------------------------------- .../hive/ql/hooks/HiveProtoLoggingHook.java | 51 ++++++------ .../hive/ql/hooks/TestHiveProtoLoggingHook.java | 88 ++------------------ 2 files changed, 30 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/076f4d2d/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index 45e1ab3..155b2be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -118,12 +118,10 @@ import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry; import org.apache.hadoop.hive.ql.parse.ExplainConfiguration; import org.apache.hadoop.hive.ql.plan.ExplainWork; import org.apache.hadoop.hive.ql.plan.HiveOperation; -import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hive.common.util.ShutdownHookManager; -import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger; import org.apache.tez.dag.history.logging.proto.ProtoMessageWriter; import org.json.JSONObject; @@ -182,6 +180,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { private final Clock clock; private final String logFileName; private final DatePartitionedLogger<HiveHookEventProto> logger; + private final ExecutorService eventHandler; private final ExecutorService logWriter; private int logFileCount = 0; private ProtoMessageWriter<HiveHookEventProto> writer; @@ -208,6 +207,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { } this.logger = tmpLogger; if (logger == null) { + eventHandler = null; logWriter = null; return; } @@ -216,16 +216,25 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT); ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Hive Hook Proto Event Handler %d").build(); + eventHandler = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory); + + threadFactory = new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Hive Hook Proto Log Writer %d").build(); logWriter = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory); } void shutdown() { - if (logWriter != null) { - logWriter.shutdown(); + // Wait for all the events to be written off, the order of service is important + for (ExecutorService service : new ExecutorService[] {eventHandler, logWriter}) { + if (service == null) { + continue; + } + service.shutdown(); try { - logWriter.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); + service.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); } catch (InterruptedException e) { LOG.warn("Got interrupted exception while waiting for events to be flushed", e); } @@ -237,9 +246,14 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { if (logger == null) { return; } - // Note: same hookContext object is used for all the events for a given query, if we try to - // do it async we have concurrency issues and when query cache is enabled, post event comes - // before we start the pre hook processing and causes inconsistent events publishing. + try { + eventHandler.execute(() -> generateEvent(hookContext)); + } catch (RejectedExecutionException e) { + LOG.warn("Handler queue full ignoring event: " + hookContext.getHookType()); + } + } + + private void generateEvent(HookContext hookContext) { QueryPlan plan = hookContext.getQueryPlan(); if (plan == null) { LOG.debug("Received null query plan."); @@ -329,10 +343,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { builder.setHiveQueryId(plan.getQueryId()); builder.setUser(getUser(hookContext)); builder.setRequestUser(getRequestUser(hookContext)); - String queueName = getQueueName(executionMode, conf); - if (queueName != null) { - builder.setQueue(queueName); - } + builder.setQueue(conf.get("mapreduce.job.queuename")); builder.setExecutionMode(executionMode.name()); builder.addAllTablesRead(getTablesFromEntitySet(hookContext.getInputs())); builder.addAllTablesWritten(getTablesFromEntitySet(hookContext.getOutputs())); @@ -374,6 +385,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { ApplicationId llapId = determineLlapId(conf, executionMode); if (llapId != null) { addMapEntry(builder, OtherInfoType.LLAP_APP_ID, llapId.toString()); + builder.setQueue(conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname)); } conf.stripHiddenConfigurations(conf); @@ -427,21 +439,6 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { return requestuser; } - private String getQueueName(ExecutionMode mode, HiveConf conf) { - switch (mode) { - case LLAP: - return conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname); - case MR: - return conf.get(MRJobConfig.QUEUE_NAME); - case TEZ: - return conf.get(TezConfiguration.TEZ_QUEUE_NAME); - case SPARK: - case NONE: - default: - return null; - } - } - private List<String> getTablesFromEntitySet(Set<? extends Entity> entities) { List<String> tableNames = new ArrayList<>(); for (Entity entity : entities) { http://git-wip-us.apache.org/repos/asf/hive/blob/076f4d2d/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java index a5939fa..8124528 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.hooks; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -29,22 +30,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryState; -import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; -import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventLogger; import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventType; -import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.ExecutionMode; import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.OtherInfoType; import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.HiveHookEventProto; import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.HiveOperation; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.TezWork; -import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger; import org.apache.tez.dag.history.logging.proto.ProtoMessageReader; import org.junit.Assert; @@ -69,9 +63,6 @@ public class TestHiveProtoLoggingHook { @Before public void setup() throws Exception { conf = new HiveConf(); - conf.set(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, "llap_queue"); - conf.set(MRJobConfig.QUEUE_NAME, "mr_queue"); - conf.set(TezConfiguration.TEZ_QUEUE_NAME, "tez_queue"); tmpFolder = folder.newFolder().getAbsolutePath(); conf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder); QueryState state = new QueryState.Builder().withHiveConf(conf).build(); @@ -103,8 +94,7 @@ public class TestHiveProtoLoggingHook { Assert.assertEquals("test_user", event.getRequestUser()); Assert.assertEquals("test_queryId", event.getHiveQueryId()); Assert.assertEquals("test_op_id", event.getOperationId()); - Assert.assertEquals(ExecutionMode.NONE.name(), event.getExecutionMode()); - Assert.assertFalse(event.hasQueue()); + Assert.assertEquals("NONE", event.getExecutionMode()); assertOtherInfo(event, OtherInfoType.TEZ, Boolean.FALSE.toString()); assertOtherInfo(event, OtherInfoType.MAPRED, Boolean.FALSE.toString()); @@ -118,69 +108,6 @@ public class TestHiveProtoLoggingHook { } @Test - public void testQueueLogs() throws Exception { - context.setHookType(HookType.PRE_EXEC_HOOK); - EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance()); - - // This makes it MR task - context.getQueryPlan().getRootTasks().add(new ExecDriver()); - evtLogger.handle(context); - - // This makes it Tez task - MapWork mapWork = new MapWork(); - TezWork tezWork = new TezWork("test_queryid"); - tezWork.add(mapWork); - TezTask task = new TezTask(); - task.setId("id1"); - task.setWork(tezWork); - context.getQueryPlan().getRootTasks().add(task); - context.getQueryPlan().getRootTasks().add(new TezTask()); - evtLogger.handle(context); - - // This makes it llap task - mapWork.setLlapMode(true); - evtLogger.handle(context); - - evtLogger.shutdown(); - - ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder); - - HiveHookEventProto event = reader.readEvent(); - Assert.assertNotNull(event); - Assert.assertEquals(ExecutionMode.MR.name(), event.getExecutionMode()); - Assert.assertEquals(event.getQueue(), "mr_queue"); - - event = reader.readEvent(); - Assert.assertNotNull(event); - Assert.assertEquals(ExecutionMode.TEZ.name(), event.getExecutionMode()); - Assert.assertEquals(event.getQueue(), "tez_queue"); - - event = reader.readEvent(); - Assert.assertNotNull(event); - Assert.assertEquals(ExecutionMode.LLAP.name(), event.getExecutionMode()); - Assert.assertEquals(event.getQueue(), "llap_queue"); - } - - @Test - public void testPreAndPostEventBoth() throws Exception { - context.setHookType(HookType.PRE_EXEC_HOOK); - EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance()); - evtLogger.handle(context); - context.setHookType(HookType.POST_EXEC_HOOK); - evtLogger.handle(context); - evtLogger.shutdown(); - - ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder); - HiveHookEventProto event = reader.readEvent(); - Assert.assertNotNull("Pre hook event not found", event); - Assert.assertEquals(EventType.QUERY_SUBMITTED.name(), event.getEventType()); - - event = reader.readEvent(); - Assert.assertNotNull("Post hook event not found", event); - Assert.assertEquals(EventType.QUERY_COMPLETED.name(), event.getEventType()); - } - - @Test public void testPostEventLog() throws Exception { context.setHookType(HookType.POST_EXEC_HOOK); context.getPerfLogger().PerfLogBegin("test", "LogTest"); @@ -224,21 +151,18 @@ public class TestHiveProtoLoggingHook { assertOtherInfo(event, OtherInfoType.PERF, null); } - private ProtoMessageReader<HiveHookEventProto> getTestReader(HiveConf conf, String tmpFolder) - throws IOException { + private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder) + throws IOException, FileNotFoundException { Path path = new Path(tmpFolder); FileSystem fs = path.getFileSystem(conf); FileStatus[] status = fs.listStatus(path); Assert.assertEquals(1, status.length); status = fs.listStatus(status[0].getPath()); Assert.assertEquals(1, status.length); + DatePartitionedLogger<HiveHookEventProto> logger = new DatePartitionedLogger<>( HiveHookEventProto.PARSER, path, conf, SystemClock.getInstance()); - return logger.getReader(status[0].getPath()); - } - - private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder) throws IOException { - ProtoMessageReader<HiveHookEventProto> reader = getTestReader(conf, tmpFolder); + ProtoMessageReader<HiveHookEventProto> reader = logger.getReader(status[0].getPath()); HiveHookEventProto event = reader.readEvent(); Assert.assertNotNull(event); return event;
