Repository: incubator-gobblin
Updated Branches:
  refs/heads/master f96379e11 -> 067d42233


Pipelined hive reg configurable path deduplication in hive reg make 
taskStateCollecorServiceHandler generic add some javadocs


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/067d4223
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/067d4223
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/067d4223

Branch: refs/heads/master
Commit: 067d42233fab93b82f62f0569159679a0b5102fa
Parents: f96379e
Author: Lei Sun <autumn...@gmail.com>
Authored: Mon Aug 7 14:55:03 2017 -0700
Committer: Issac Buenrostro <ibuen...@apache.org>
Committed: Tue Aug 8 10:47:25 2017 -0700

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  1 +
 .../publisher/HiveRegistrationPublisher.java    | 36 +++++++++++--
 ...RegTaskStateCollectorServiceHandlerImpl.java | 56 ++++++++++++++++++++
 .../runtime/TaskStateCollectorService.java      | 41 ++++++++++++--
 .../TaskStateCollectorServiceHandler.java       | 43 +++++++++++++++
 ...teCollectorServiceHiveRegHandlerFactory.java | 33 ++++++++++++
 .../runtime/TaskStateCollectorServiceTest.java  | 14 +++++
 7 files changed, 218 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 79e9b98..36921be 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -567,6 +567,7 @@ public class ConfigurationKeys {
    */
   public static final String TASK_STATE_COLLECTOR_INTERVAL_SECONDS = 
"task.state.collector.interval.secs";
   public static final int DEFAULT_TASK_STATE_COLLECTOR_INTERVAL_SECONDS = 60;
+  public static final String TASK_STATE_COLLECTOR_HANDLER_CLASS = 
"task.state.collector.handler.class";
 
   /**
    * Configuration properties for email settings.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
index 281acbe..17370b7 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/HiveRegistrationPublisher.java
@@ -36,6 +36,7 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.Sets;
 import com.google.common.io.Closer;
 
+import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
@@ -65,22 +66,47 @@ import lombok.extern.slf4j.Slf4j;
  * @author Ziyang Liu
  */
 @Slf4j
+@Alias("hivereg")
 public class HiveRegistrationPublisher extends DataPublisher {
 
   private static final String DATA_PUBLISH_TIME = 
HiveRegistrationPublisher.class.getName() + ".lastDataPublishTime";
   private static final Splitter LIST_SPLITTER_COMMA = 
Splitter.on(",").trimResults().omitEmptyStrings();
   public static final String HIVE_SPEC_COMPUTATION_TIMER = 
"hiveSpecComputationTimer";
+  private static final String PATH_DEDUPE_ENABLED = 
"hive.registration.path.dedupe.enabled";
+  private static final boolean DEFAULT_PATH_DEDUPE_ENABLED = true;
+
   private final Closer closer = Closer.create();
   private final HiveRegister hiveRegister;
   private final ExecutorService hivePolicyExecutor;
   private final MetricContext metricContext;
 
+  /**
+   * The configuration to determine if path deduplication should be enabled 
during Hive Registration process.
+   * Recall that HiveRegistration iterate thru. each topics' data folder and 
obtain schema from newest partition,
+   * it might be the case that a table corresponding to a registered path has 
a schema changed.
+   * In this case, path-deduplication won't work.
+   *
+   * e.g. In streaming mode, there could be cases that files(e.g. avro) under 
single topic folder carry different schema.
+   */
+  private boolean isPathDedupeEnabled;
+
+  /**
+   * Make the deduplication of path to be registered in the Publisher level,
+   * So that each invocation of {@link #publishData(Collection)} contribute 
paths registered to this set.
+   */
+  private static Set<String> pathsToRegisterFromSingleState = 
Sets.newHashSet();
+
+  /**
+   * @param state This is a Job State
+   */
   public HiveRegistrationPublisher(State state) {
     super(state);
     this.hiveRegister = this.closer.register(HiveRegister.get(state));
     this.hivePolicyExecutor = 
ExecutorsUtils.loggingDecorator(Executors.newFixedThreadPool(new 
HiveRegProps(state).getNumThreads(),
         ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("HivePolicyExecutor-%d"))));
     this.metricContext = Instrumented.getMetricContext(state, 
HiveRegistrationPublisher.class);
+
+    isPathDedupeEnabled = state.getPropAsBoolean(PATH_DEDUPE_ENABLED, 
this.DEFAULT_PATH_DEDUPE_ENABLED);
   }
 
   @Override
@@ -96,6 +122,9 @@ public class HiveRegistrationPublisher extends DataPublisher 
{
   @Override
   public void initialize() throws IOException {}
 
+  /**
+   * @param states This is a collection of TaskState.
+   */
   @Override
   public void publishData(Collection<? extends WorkUnitState> states) throws 
IOException {
     CompletionService<Collection<HiveSpec>> completionService =
@@ -107,7 +136,7 @@ public class HiveRegistrationPublisher extends 
DataPublisher {
 
     // Here all runtime task-level props are injected into superstate which 
installed in each Policy Object.
     // runtime.props are comma-separated props collected in runtime.
-    Set<String> pathsToRegisterFromSingleState = Sets.newHashSet() ;
+    int toRegisterPathCount = 0 ;
     for (State state:states) {
       State taskSpecificState = state;
       if (state.contains(ConfigurationKeys.PUBLISHER_DIRS)) {
@@ -125,10 +154,11 @@ public class HiveRegistrationPublisher extends 
DataPublisher {
 
         final HiveRegistrationPolicy policy = 
HiveRegistrationPolicyBase.getPolicy(taskSpecificState);
         for ( final String path : 
state.getPropAsList(ConfigurationKeys.PUBLISHER_DIRS) ) {
-          if (pathsToRegisterFromSingleState.contains(path)){
+          if (isPathDedupeEnabled && 
pathsToRegisterFromSingleState.contains(path)){
             continue;
           }
           pathsToRegisterFromSingleState.add(path);
+          toRegisterPathCount += 1;
           completionService.submit(new Callable<Collection<HiveSpec>>() {
             @Override
             public Collection<HiveSpec> call() throws Exception {
@@ -141,7 +171,7 @@ public class HiveRegistrationPublisher extends 
DataPublisher {
       }
       else continue;
     }
-    for (int i = 0; i < pathsToRegisterFromSingleState.size(); i++) {
+    for (int i = 0; i < toRegisterPathCount; i++) {
       try {
         for (HiveSpec spec : completionService.take().get()) {
           this.hiveRegister.register(spec);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
new file mode 100644
index 0000000..ecbd1a5
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/HiveRegTaskStateCollectorServiceHandlerImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.publisher.HiveRegistrationPublisher;
+
+
+/**
+ * A {@link TaskStateCollectorServiceHandler} implementation that execute hive 
registration on driver level.
+ * It registers all {@link TaskState} once they are available.
+ * Since {@link TaskStateCollectorService} is by default being invoked every 
minute,
+ * if a single batch of hive registration finishes within a minute, the 
latency can be hidden by the gap between two run
+ * of {@link TaskStateCollectorService}.
+ */
+
+public class HiveRegTaskStateCollectorServiceHandlerImpl implements 
TaskStateCollectorServiceHandler {
+
+  private HiveRegistrationPublisher hiveRegHandler;
+
+  public HiveRegTaskStateCollectorServiceHandlerImpl(JobState jobState){
+    hiveRegHandler = new HiveRegistrationPublisher(jobState);
+  }
+
+  @Override
+  public void handle(Collection<? extends WorkUnitState> taskStates) {
+    try {
+      this.hiveRegHandler.publishData(taskStates);
+    }catch (IOException ioe){
+      throw new RuntimeException("Hive-registration pushling of data in 
TaskStateCollector run into IOException:", ioe);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    hiveRegHandler.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
index f832257..6f19243 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
@@ -17,9 +17,6 @@
 
 package org.apache.gobblin.runtime;
 
-import com.google.common.base.Predicate;
-import org.apache.gobblin.metastore.FsStateStore;
-import org.apache.gobblin.metastore.StateStore;
 import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
@@ -35,9 +32,15 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Queues;
 import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.common.base.Predicate;
+import com.google.common.io.Closer;
+import com.google.common.base.Optional;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.metastore.FsStateStore;
+import org.apache.gobblin.metastore.StateStore;
 
 
 /**
@@ -66,6 +69,14 @@ public class TaskStateCollectorService extends 
AbstractScheduledService {
 
   private final Path outputTaskStateDir;
 
+  /**
+   * Add a cloesable action to run after each existence-checking of task state 
file.
+   * A typical example to plug here is hive registration:
+   * We do hive registration everytime there are available taskStates 
deserialized from storage, on the driver level.
+   */
+  public final Optional<TaskStateCollectorServiceHandler> 
optionalTaskCollectorHandler;
+  private final Closer handlerCloser = Closer.create();
+
   public TaskStateCollectorService(Properties jobProps, JobState jobState, 
EventBus eventBus,
       StateStore<TaskState> taskStateStore, Path outputTaskStateDir) {
     this.jobState = jobState;
@@ -79,6 +90,22 @@ public class TaskStateCollectorService extends 
AbstractScheduledService {
     this.outputTaskStatesCollectorIntervalSeconds =
         
Integer.parseInt(jobProps.getProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_INTERVAL_SECONDS,
             
Integer.toString(ConfigurationKeys.DEFAULT_TASK_STATE_COLLECTOR_INTERVAL_SECONDS)));
+
+    if 
(jobProps.containsKey(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS)){
+      String handlerTypeName = 
jobProps.getProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS);
+      try{
+        
ClassAliasResolver<TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory>
 aliasResolver =
+            new 
ClassAliasResolver<>(TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory.class);
+        
TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory 
handlerFactory =
+            aliasResolver.resolveClass(handlerTypeName).newInstance();
+        optionalTaskCollectorHandler = 
Optional.of(handlerCloser.register(handlerFactory.createHandler(this.jobState)));
+      } catch (ReflectiveOperationException rfe){
+        throw new RuntimeException("Could not construct TaskCollectorHandler " 
+ handlerTypeName, rfe);
+      }
+    }
+    else{
+      optionalTaskCollectorHandler = Optional.absent();
+    }
   }
 
   @Override
@@ -105,6 +132,7 @@ public class TaskStateCollectorService extends 
AbstractScheduledService {
       runOneIteration();
     } finally {
       super.shutDown();
+      this.handlerCloser.close();
     }
   }
 
@@ -160,6 +188,13 @@ public class TaskStateCollectorService extends 
AbstractScheduledService {
       this.jobState.addTaskState(taskState);
     }
 
+    // Finish any addtional steps defined in handler on driver level.
+    // Currently implemented handler for Hive registration only.
+    if (optionalTaskCollectorHandler.isPresent()){
+      LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + 
taskStateQueue.size() + " tasks");
+      optionalTaskCollectorHandler.get().handle(taskStateQueue);
+    }
+
     // Notify the listeners for the completion of the tasks
     this.eventBus.post(new 
NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java
new file mode 100644
index 0000000..a35964c
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import java.io.Closeable;
+import java.util.Collection;
+
+
+/**
+ * Define basic interface for Handler in TaskStateCollectorService,
+ * which runs in the gobblin's driver level.
+ *
+ */
+public interface TaskStateCollectorServiceHandler extends Closeable {
+
+  /**
+   * Interface of handler factory.
+   */
+  interface TaskStateCollectorServiceHandlerFactory{
+    TaskStateCollectorServiceHandler createHandler(JobState jobState);
+  }
+
+  /**
+   * Execute the actions of handler.
+   */
+  public void handle(Collection<? extends WorkUnitState> states)  ;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHiveRegHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHiveRegHandlerFactory.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHiveRegHandlerFactory.java
new file mode 100644
index 0000000..46f1b7e
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorServiceHiveRegHandlerFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import org.apache.gobblin.annotation.Alias;
+
+@Alias("hivereg")
+/**
+ * Implementation of TaskStateCollectorServiceHandlerFactory that specific to 
Hive Registration as the action to be
+ * taken in TaskStateCollectorService.
+ */
+public class TaskStateCollectorServiceHiveRegHandlerFactory
+    implements 
TaskStateCollectorServiceHandler.TaskStateCollectorServiceHandlerFactory {
+  @Override
+  public TaskStateCollectorServiceHandler createHandler(JobState jobState) {
+    return new HiveRegTaskStateCollectorServiceHandlerImpl(jobState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/067d4223/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java
index a0731eb..287d7f4 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskStateCollectorServiceTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.runtime;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Properties;
@@ -71,6 +72,7 @@ public class TaskStateCollectorServiceTest {
 
     this.taskStateStore = new FsStateStore<>(this.localFs, 
this.outputTaskStateDir.toUri().getPath(), TaskState.class);
 
+
     this.taskStateCollectorService = new TaskStateCollectorService(new 
Properties(), this.jobState, this.eventBus,
         this.taskStateStore, new Path(this.outputTaskStateDir, JOB_ID));
 
@@ -101,6 +103,18 @@ public class TaskStateCollectorServiceTest {
     Assert.assertEquals(this.taskStateMap.get(TASK_ID_1).getTaskId(), 
TASK_ID_1);
   }
 
+  @Test
+  public void testHandlerResolution() throws Exception{
+    Properties props = new Properties();
+    props.setProperty(ConfigurationKeys.TASK_STATE_COLLECTOR_HANDLER_CLASS, 
"hivereg");
+    TaskStateCollectorService taskStateCollectorServiceHive = new 
TaskStateCollectorService(props, this.jobState, this.eventBus,
+        this.taskStateStore, new Path(this.outputTaskStateDir, JOB_ID + 
"_prime"));
+    
Assert.assertEquals(taskStateCollectorServiceHive.optionalTaskCollectorHandler.get().getClass().getName(),
+        
"org.apache.gobblin.runtime.HiveRegTaskStateCollectorServiceHandlerImpl");
+    taskStateCollectorServiceHive.shutDown();
+    return;
+  }
+
   @AfterClass
   public void tearDown() throws IOException {
     if (this.localFs.exists(this.outputTaskStateDir)) {

Reply via email to