Repository: incubator-gobblin Updated Branches: refs/heads/master f96379e11 -> 067d42233
Pipelined hive reg configurable path deduplication in hive reg make taskStateCollecorServiceHandler generic add some javadocs Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/067d4223 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/067d4223 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/067d4223 Branch: refs/heads/master Commit: 067d42233fab93b82f62f0569159679a0b5102fa Parents: f96379e Author: Lei Sun <autumn...@gmail.com> Authored: Mon Aug 7 14:55:03 2017 -0700 Committer: Issac Buenrostro <ibuen...@apache.org> Committed: Tue Aug 8 10:47:25 2017 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 1 + .../publisher/HiveRegistrationPublisher.java | 36 +++++++++++-- ...RegTaskStateCollectorServiceHandlerImpl.java | 56 ++++++++++++++++++++ .../runtime/TaskStateCollectorService.java | 41 ++++++++++++-- .../TaskStateCollectorServiceHandler.java | 43 +++++++++++++++ ...teCollectorServiceHiveRegHandlerFactory.java | 33 ++++++++++++ .../runtime/TaskStateCollectorServiceTest.java | 14 +++++ 7 files changed, 218 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 79e9b98..36921be 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -567,6 +567,7 @@ public class ConfigurationKeys { */ public static final String TASK_STATE_COLLECTOR_INTERVAL_SECONDS = "task.state.collector.interval.secs"; public static final int DEFAULT_TASK_STATE_COLLECTOR_INTERVAL_SECONDS = 60; + public static final String TASK_STATE_COLLECTOR_HANDLER_CLASS = "task.state.collector.handler.class"; /** * Configuration properties for email settings. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java index 281acbe..17370b7 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java @@ -36,6 +36,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.Sets; import com.google.common.io.Closer; +import org.apache.gobblin.annotation.Alias; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; @@ -65,22 +66,47 @@ import lombok.extern.slf4j.Slf4j; * @author Ziyang Liu */ @Slf4j +@Alias("hivereg") public class HiveRegistrationPublisher extends DataPublisher { private static final String DATA_PUBLISH_TIME = HiveRegistrationPublisher.class.getName() + ".lastDataPublishTime"; private static final Splitter LIST_SPLITTER_COMMA = Splitter.on(",").trimResults().omitEmptyStrings(); public static final String HIVE_SPEC_COMPUTATION_TIMER = "hiveSpecComputationTimer"; + private static final String PATH_DEDUPE_ENABLED = "hive.registration.path.dedupe.enabled"; + private static final boolean DEFAULT_PATH_DEDUPE_ENABLED = true; + private final Closer closer = Closer.create(); private final HiveRegister hiveRegister; private final ExecutorService hivePolicyExecutor; private final MetricContext metricContext; + /** + * The configuration to determine if path deduplication should be enabled during Hive Registration process. + * Recall that HiveRegistration iterate thru. each topics' data folder and obtain schema from newest partition, + * it might be the case that a table corresponding to a registered path has a schema changed. + * In this case, path-deduplication won't work. + * + * e.g. In streaming mode, there could be cases that files(e.g. avro) under single topic folder carry different schema. + */ + private boolean isPathDedupeEnabled; + + /** + * Make the deduplication of path to be registered in the Publisher level, + * So that each invocation of {@link #publishData(Collection)} contribute paths registered to this set. + */ + private static Set<String> pathsToRegisterFromSingleState = Sets.newHashSet(); + + /** + * @param state This is a Job State + */ public HiveRegistrationPublisher(State state) { super(state); this.hiveRegister = this.closer.register(HiveRegister.get(state)); this.hivePolicyExecutor = ExecutorsUtils.loggingDecorator(Executors.newFixedThreadPool(new HiveRegProps(state).getNumThreads(), ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("HivePolicyExecutor-%d")))); this.metricContext = Instrumented.getMetricContext(state, HiveRegistrationPublisher.class); + + isPathDedupeEnabled = state.getPropAsBoolean(PATH_DEDUPE_ENABLED, this.DEFAULT_PATH_DEDUPE_ENABLED); } @Override @@ -96,6 +122,9 @@ public class HiveRegistrationPublisher extends DataPublisher { @Override public void initialize() throws IOException {} + /** + * @param states This is a collection of TaskState. + */ @Override public void publishData(Collection<? extends WorkUnitState> states) throws IOException { CompletionService<Collection<HiveSpec>> completionService = @@ -107,7 +136,7 @@ public class HiveRegistrationPublisher extends DataPublisher { // Here all runtime task-level props are injected into superstate which installed in each Policy Object. // runtime.props are comma-separated props collected in runtime. - Set<String> pathsToRegisterFromSingleState = Sets.newHashSet() ; + int toRegisterPathCount = 0 ; for (State state:states) { State taskSpecificState = state; if (state.contains(ConfigurationKeys.PUBLISHER_DIRS)) { @@ -125,10 +154,11 @@ public class HiveRegistrationPublisher extends DataPublisher { final HiveRegistrationPolicy policy = HiveRegistrationPolicyBase.getPolicy(taskSpecificState); for ( final String path : state.getPropAsList(ConfigurationKeys.PUBLISHER_DIRS) ) { - if (pathsToRegisterFromSingleState.contains(path)){ + if (isPathDedupeEnabled && pathsToRegisterFromSingleState.contains(path)){ continue; } pathsToRegisterFromSingleState.add(path); + toRegisterPathCount += 1; completionService.submit(new Callable<Collection<HiveSpec>>() { @Override public Collection<HiveSpec> call() throws Exception { @@ -141,7 +171,7 @@ public class HiveRegistrationPublisher extends DataPublisher { } else continue; } - for (int i = 0; i < pathsToRegisterFromSingleState.size(); i++) { + for (int i = 0; i < toRegisterPathCount; i++) { try { for (HiveSpec spec : completionService.take().get()) { this.hiveRegister.register(spec); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java new file mode 100644 index 0000000..ecbd1a5 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.publisher.HiveRegistrationPublisher; + + +/** + * A {@link TaskStateCollectorServiceHandler} implementation that execute hive registration on driver level. + * It registers all {@link TaskState} once they are available. + * Since {@link TaskStateCollectorService} is by default being invoked every minute, + * if a single batch of hive registration finishes within a minute, the latency can be hidden by the gap between two run + * of {@link TaskStateCollectorService}. + */ + +public class HiveRegTaskStateCollectorServiceHandlerImpl implements TaskStateCollectorServiceHandler { + + private HiveRegistrationPublisher hiveRegHandler; + + public HiveRegTaskStateCollectorServiceHandlerImpl(JobState jobState){ + hiveRegHandler = new HiveRegistrationPublisher(jobState); + } + + @Override + public void handle(Collection<? extends WorkUnitState> taskStates) { + try { + this.hiveRegHandler.publishData(taskStates); + }catch (IOException ioe){ + throw new RuntimeException("Hive-registration pushling of data in TaskStateCollector run into IOException:", ioe); + } + } + + @Override + public void close() throws IOException { + hiveRegHandler.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java index f832257..6f19243 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java @@ -17,9 +17,6 @@ package org.apache.gobblin.runtime; -import com.google.common.base.Predicate; -import org.apache.gobblin.metastore.FsStateStore; -import org.apache.gobblin.metastore.StateStore; import java.io.IOException; import java.util.List; import java.util.Properties; @@ -35,9 +32,15 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Queues; import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.AbstractScheduledService; +import com.google.common.base.Predicate; +import com.google.common.io.Closer; +import com.google.common.base.Optional; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ParallelRunner; +import org.apache.gobblin.metastore.FsStateStore; +import org.apache.gobblin.metastore.StateStore; /** @@ -66,6 +69,14 @@ public class TaskStateCollectorService extends AbstractScheduledService { private final Path outputTaskStateDir; + /** + * Add a cloesable action to run after each existence-checking of task state file. + * A typical example to plug here is hive registration: + * We do hive registration everytime there are available taskStates deserialized from storage, on the driver level. + */ + public final Optional<TaskStateCollectorServiceHandler> optionalTaskCollectorHandler; + private final Closer handlerCloser = Closer.create(); + public TaskStateCollectorService(Properties jobProps, JobState jobState, EventBus eventBus, StateStore<TaskState> taskStateStore, Path outputTaskStateDir) { this.jobState = jobState; @@ -79,6 +90,22 @@ public class TaskStateCollectorService extends AbstractScheduledService { this.outputTaskStatesCollectorIntervalSeconds = Integer.parseInt(jobProps.getProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_INTERVAL_SECONDS, Integer.toString(ConfigurationKeys.DEFAULT_TASK_STATE_COLLECTOR_INTERVAL_SECONDS))); + + if (jobProps.containsKey(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS)){ + String handlerTypeName = jobProps.getProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS); + try{ + ClassAliasResolver<TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory> aliasResolver = + new ClassAliasResolver<>(TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory.class); + TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory handlerFactory = + aliasResolver.resolveClass(handlerTypeName).newInstance(); + optionalTaskCollectorHandler = Optional.of(handlerCloser.register(handlerFactory.createHandler(this.jobState))); + } catch (ReflectiveOperationException rfe){ + throw new RuntimeException("Could not construct TaskCollectorHandler " + handlerTypeName, rfe); + } + } + else{ + optionalTaskCollectorHandler = Optional.absent(); + } } @Override @@ -105,6 +132,7 @@ public class TaskStateCollectorService extends AbstractScheduledService { runOneIteration(); } finally { super.shutDown(); + this.handlerCloser.close(); } } @@ -160,6 +188,13 @@ public class TaskStateCollectorService extends AbstractScheduledService { this.jobState.addTaskState(taskState); } + // Finish any addtional steps defined in handler on driver level. + // Currently implemented handler for Hive registration only. + if (optionalTaskCollectorHandler.isPresent()){ + LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks"); + optionalTaskCollectorHandler.get().handle(taskStateQueue); + } + // Notify the listeners for the completion of the tasks this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue))); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java new file mode 100644 index 0000000..a35964c --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime; + +import org.apache.gobblin.configuration.WorkUnitState; +import java.io.Closeable; +import java.util.Collection; + + +/** + * Define basic interface for Handler in TaskStateCollectorService, + * which runs in the gobblin's driver level. + * + */ +public interface TaskStateCollectorServiceHandler extends Closeable { + + /** + * Interface of handler factory. + */ + interface TaskStateCollectorServiceHandlerFactory{ + TaskStateCollectorServiceHandler createHandler(JobState jobState); + } + + /** + * Execute the actions of handler. + */ + public void handle(Collection<? extends WorkUnitState> states) ; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHiveRegHandlerFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHiveRegHandlerFactory.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHiveRegHandlerFactory.java new file mode 100644 index 0000000..46f1b7e --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHiveRegHandlerFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime; + +import org.apache.gobblin.annotation.Alias; + +@Alias("hivereg") +/** + * Implementation of TaskStateCollectorServiceHandlerFactory that specific to Hive Registration as the action to be + * taken in TaskStateCollectorService. + */ +public class TaskStateCollectorServiceHiveRegHandlerFactory + implements TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory { + @Override + public TaskStateCollectorServiceHandler createHandler(JobState jobState) { + return new HiveRegTaskStateCollectorServiceHandlerImpl(jobState); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java index a0731eb..287d7f4 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.runtime; +import org.apache.gobblin.configuration.ConfigurationKeys; import java.io.IOException; import java.util.Map; import java.util.Properties; @@ -71,6 +72,7 @@ public class TaskStateCollectorServiceTest { this.taskStateStore = new FsStateStore<>(this.localFs, this.outputTaskStateDir.toUri().getPath(), TaskState.class); + this.taskStateCollectorService = new TaskStateCollectorService(new Properties(), this.jobState, this.eventBus, this.taskStateStore, new Path(this.outputTaskStateDir, JOB_ID)); @@ -101,6 +103,18 @@ public class TaskStateCollectorServiceTest { Assert.assertEquals(this.taskStateMap.get(TASK_ID_1).getTaskId(), TASK_ID_1); } + @Test + public void testHandlerResolution() throws Exception{ + Properties props = new Properties(); + props.setProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS, "hivereg"); + TaskStateCollectorService taskStateCollectorServiceHive = new TaskStateCollectorService(props, this.jobState, this.eventBus, + this.taskStateStore, new Path(this.outputTaskStateDir, JOB_ID + "_prime")); + Assert.assertEquals(taskStateCollectorServiceHive.optionalTaskCollectorHandler.get().getClass().getName(), + "org.apache.gobblin.runtime.HiveRegTaskStateCollectorServiceHandlerImpl"); + taskStateCollectorServiceHive.shutDown(); + return; + } + @AfterClass public void tearDown() throws IOException { if (this.localFs.exists(this.outputTaskStateDir)) {