This is an automated email from the ASF dual-hosted git repository.

kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2801321  [GOBBLIN-1216] Embedded Hive Distcp
2801321 is described below

commit 28013218552657c7933cc61547657be3caf90d73
Author: Lei Sun <[email protected]>
AuthorDate: Tue Jul 21 11:29:21 2020 -0700

    [GOBBLIN-1216] Embedded Hive Distcp
    
    Embedded Hive Distcp
    
    Address comments
    
    Closes #3064 from autumnust/embeddedHiveDistcp-new
---
 .../runtime/embedded/EmbeddedGobblinDistcp.java    |   8 +-
 .../embedded/EmbeddedGobblinDistcpTest.java        | 119 +++++++++++++++++++--
 .../src/test/resources/hive-site.xml               |  14 +++
 .../gobblin/runtime/embedded/EmbeddedGobblin.java  |   6 +-
 .../main/resources/templates/hiveDistcp.template   |  96 +++++++++++++++++
 5 files changed, 230 insertions(+), 13 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
index f7076bd..f61a9fa 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
@@ -68,10 +68,16 @@ public class EmbeddedGobblinDistcp extends EmbeddedGobblin {
     }
   }
 
+  // For backward-compatibility, default to distcp.template
   public EmbeddedGobblinDistcp(Path from, Path to) throws 
JobTemplate.TemplateException, IOException {
+    this("templates/distcp.template", from, to);
+  }
+
+  // An interface to load specified template.
+  public EmbeddedGobblinDistcp(String templateLoc, Path from, Path to) throws 
JobTemplate.TemplateException, IOException {
     super("Distcp");
     try {
-      
setTemplate(ResourceBasedJobTemplate.forResourcePath("templates/distcp.template"));
+      setTemplate(ResourceBasedJobTemplate.forResourcePath(templateLoc));
     } catch (URISyntaxException | SpecNotFoundException exc) {
       throw new RuntimeException("Could not instantiate an " + 
EmbeddedGobblinDistcp.class.getName(), exc);
     }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index 18da5e8..a54ba95 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -21,28 +21,29 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.converter.GobblinMetricsPinotFlattenerConverter;
-import org.apache.gobblin.data.management.copy.CopyConfiguration;
-import org.apache.gobblin.data.management.copy.CopySource;
-import org.apache.gobblin.data.management.copy.SchemaCheckedCopySource;
-import org.apache.gobblin.runtime.api.JobExecutionResult;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
-
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.api.client.util.Charsets;
@@ -50,8 +51,40 @@ import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.converter.GobblinMetricsPinotFlattenerConverter;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.SchemaCheckedCopySource;
+import org.apache.gobblin.runtime.api.JobExecutionResult;
+import org.apache.gobblin.util.HiveJdbcConnector;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
+
 
 public class EmbeddedGobblinDistcpTest {
+  private HiveJdbcConnector jdbcConnector;
+  private IMetaStoreClient metaStoreClient;
+  private static final String TEST_DB = "testdb";
+  private static final String TEST_TABLE = "test_table";
+  private static final String TARGET_PATH = "/tmp/target";
+  private static final String TARGET_DB = "target";
+
+  @BeforeClass
+  public void setup() throws Exception {
+    try {
+      HiveConf hiveConf = new HiveConf();
+      // Start a Hive session in this thread and register the UDF
+      SessionState.start(hiveConf);
+      SessionState.get().initTxnMgr(hiveConf);
+      metaStoreClient = new HiveMetaStoreClient(new HiveConf());
+      jdbcConnector = HiveJdbcConnector.newEmbeddedConnector(2);
+    } catch (HiveException he) {
+      throw new RuntimeException("Failed to start Hive session.", he);
+    } catch (SQLException se) {
+      throw new RuntimeException("Cannot initialize the jdbc-connector due to: 
", se);
+    }
+  }
 
   @Test
   public void test() throws Exception {
@@ -84,6 +117,74 @@ public class EmbeddedGobblinDistcpTest {
   }
 
   @Test
+  public void hiveTest() throws Exception {
+    Statement statement = jdbcConnector.getConnection().createStatement();
+
+    // Start from a fresh Hive backup: No DB, no table.
+    // Create a DB.
+    statement.execute("CREATE database if not exists " + TEST_DB);
+
+    // Create a table.
+    String tableCreationSQL = "CREATE TABLE IF NOT EXISTS $testdb.$test_table 
(id int, name String)\n" + "ROW FORMAT DELIMITED\n"
+        + "FIELDS TERMINATED BY '\\t'\n" + "LINES TERMINATED BY '\\n'\n" + 
"STORED AS TEXTFILE";
+    
statement.execute(tableCreationSQL.replace("$testdb",TEST_DB).replace("$test_table",
 TEST_TABLE));
+
+    // Insert data
+    String dataInsertionSQL = "INSERT INTO TABLE $testdb.$test_table VALUES 
(1, 'one'), (2, 'two'), (3, 'three')";
+    
statement.execute(dataInsertionSQL.replace("$testdb",TEST_DB).replace("$test_table",
 TEST_TABLE));
+    String templateLoc = "templates/hiveDistcp.template";
+
+    // Either of the "from" or "to" will be used here since it is a Hive 
Distcp.
+    EmbeddedGobblinDistcp embeddedHiveDistcp =
+        new EmbeddedGobblinDistcp(templateLoc, new Path("a"), new Path("b"));
+    embeddedHiveDistcp.setConfiguration("hive.dataset.copy.target.database", 
TARGET_DB);
+    
embeddedHiveDistcp.setConfiguration("hive.dataset.copy.target.table.prefixReplacement",
 TARGET_PATH);
+
+    String dbPathTemplate = "/$testdb.db/$test_table";
+    String rootPathOfSourceDate = 
metaStoreClient.getConfigValue("hive.metastore.warehouse.dir", "")
+        .concat(dbPathTemplate.replace("$testdb", 
TEST_DB).replace("$test_table",TEST_TABLE)
+    );
+    
embeddedHiveDistcp.setConfiguration("hive.dataset.copy.target.table.prefixToBeReplaced",
 rootPathOfSourceDate);
+    embeddedHiveDistcp.run();
+
+    // Verify the table is existed in the target and file exists in the target 
location.
+    metaStoreClient.tableExists(TARGET_DB, TEST_TABLE);
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    fs.exists(new Path(TARGET_PATH));
+  }
+
+  // Tearing down the Hive components from derby driver if there's anything 
generated through the test.
+  @AfterClass
+  public void hiveTearDown() throws Exception {
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Path targetPath = new Path(TARGET_PATH);
+    if (fs.exists(targetPath)) {
+      fs.delete(targetPath, true);
+    }
+
+    if (metaStoreClient != null) {
+      // Clean the source table and DB
+      if (metaStoreClient.tableExists(TEST_DB, TEST_TABLE)) {
+        metaStoreClient.dropTable(TEST_DB, TEST_TABLE);
+      }
+      if (metaStoreClient.getAllDatabases().contains(TEST_DB)) {
+        metaStoreClient.dropDatabase(TEST_DB);
+      }
+
+      // Clean the target table and DB
+      if (metaStoreClient.tableExists("target", TEST_TABLE)) {
+        metaStoreClient.dropTable("target", TEST_TABLE, true, true);
+      }
+      if (metaStoreClient.getAllDatabases().contains(TARGET_DB)) {
+        metaStoreClient.dropDatabase(TARGET_DB);
+      }
+      metaStoreClient.close();
+    }
+
+    jdbcConnector.close();
+  }
+
+  @Test
   public void testCheckSchema() throws Exception {
     Schema schema = null;
     try (InputStream is = 
GobblinMetricsPinotFlattenerConverter.class.getClassLoader().getResourceAsStream("avroSchemaManagerTest/expectedSchema.avsc"))
 {
diff --git a/gobblin-data-management/src/test/resources/hive-site.xml 
b/gobblin-data-management/src/test/resources/hive-site.xml
new file mode 100644
index 0000000..ae1b4cd
--- /dev/null
+++ b/gobblin-data-management/src/test/resources/hive-site.xml
@@ -0,0 +1,14 @@
+<configuration>
+  <property>
+    <name>javax.jdo.option.ConnectionDriverName</name>
+    <value>org.apache.derby.jdbc.EmbeddedDriver</value>
+  </property>
+  <property>
+    <name>javax.jdo.option.ConnectionURL</name>
+    
<value>jdbc:derby:;databaseName=/tmp/scratch/hive/metastore_db;create=true</value>
+  </property>
+  <property>
+    <name>hive.metastore.warehouse.dir</name>
+    <value>/tmp/scratch/hive/warehouse</value>
+  </property>
+</configuration>
\ No newline at end of file
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
index 1a1e858..d723ff8 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
@@ -142,9 +142,9 @@ public class EmbeddedGobblin {
   private Runnable distributeJarsFunction;
   private JobTemplate template;
   private Logger useLog = log;
-  private FullTimeout launchTimeout = new FullTimeout(10, TimeUnit.SECONDS);
-  private FullTimeout jobTimeout = new FullTimeout(10, TimeUnit.DAYS);
-  private FullTimeout shutdownTimeout = new FullTimeout(10, TimeUnit.SECONDS);
+  private FullTimeout launchTimeout = new FullTimeout(100, TimeUnit.SECONDS);
+  private FullTimeout jobTimeout = new FullTimeout(100, TimeUnit.DAYS);
+  private FullTimeout shutdownTimeout = new FullTimeout(100, TimeUnit.SECONDS);
   private boolean dumpJStackOnTimeout = false;
   private List<GobblinInstancePluginFactory> plugins = Lists.newArrayList();
   private Optional<Path> jobFile = Optional.absent();
diff --git a/gobblin-runtime/src/main/resources/templates/hiveDistcp.template 
b/gobblin-runtime/src/main/resources/templates/hiveDistcp.template
new file mode 100644
index 0000000..d85144d
--- /dev/null
+++ b/gobblin-runtime/src/main/resources/templates/hiveDistcp.template
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# ====================================================================
+# Job configurations (can be changed)
+# ====================================================================
+
+# General job metadata
+job.group=DistcpHive
+job.name=EmbeddedDistcpHiveTracking
+job.description=Embedded Hive-Distcp-ng toy job
+
+// URI comes from hiveConf, don't have to fill anything here.
+hive.metastore.uri=""
+hive.db.root.dir=/tmp
+
+# Source and target metastores
+hive.dataset.hive.metastore.uri=${hive.metastore.uri}
+hive.dataset.copy.target.metastore.uri=${hive.metastore.uri}
+gobblin.dataset.profile.class=org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder
+
+# Database and tables copy
+hive.dataset.whitelist=testdb.test_table
+#hive.dataset.copy.target.database=target
+
+# Conflicting table and partitions treatment
+hive.dataset.existing.entity.conflict.policy=REPLACE_PARTITIONS
+
+# What to do with the files when deregistering a partition
+hive.dataset.copy.deregister.fileDeleteMethod=NO_DELETE
+
+# Which files to copy when copying a partition:
+# INPUT_FORMAT -> use input format to find files, RECURSIVE -> copy all files 
under partition location
+hive.dataset.copy.location.listing.method=RECURSIVE
+# Should skip hidden paths when copying
+hive.dataset.copy.locations.listing.skipHiddenPaths=true
+
+# Use registration time (Hive parameter) to determine whether a partition 
should be skipped
+hive.dataset.copy.fast.partition.skip.predicate=org.apache.gobblin.data.management.copy.predicates.RegistrationTimeSkipPredicate
+
+# Partition filter
+hive.dataset.copy.partition.filter.generator=org.apache.gobblin.data.management.copy.hive.filter.LookbackPartitionFilterGenerator
+hive.dataset.partition.filter.datetime.column=datepartition
+hive.dataset.partition.filter.datetime.lookback=P7D
+hive.dataset.partition.filter.datetime.format=YYYY-MM-dd-HH
+
+# Preserve attributes
+gobblin.copy.preserved.attributes=rgbp
+
+# Simulate?
+gobblin.copy.simulate=false
+
+# Bin packing
+# 250 MB
+gobblin.copy.binPacking.maxSizePerBin=250000000
+
+# Trash
+gobblin.trash.location=/data/trash/tracking/_GOBBLIN_TRASH
+
+# target location for copy
+data.publisher.final.dir=${hive.dataset.copy.target.table.prefixReplacement}
+
+# ====================================================================
+# Distcp configurations
+# ====================================================================
+
+extract.namespace=org.apache.gobblin.copy
+data.publisher.type=org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher
+source.class=org.apache.gobblin.data.management.copy.CopySource
+writer.builder.class=org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder
+converter.classes=org.apache.gobblin.converter.IdentityConverter
+
+task.maxretries=0
+workunit.retry.enabled=false
+user.defined.staging.dir.flag=false
+
+distcp.persist.dir=/tmp/distcp-persist-dir
+
+cleanup.staging.data.per.task=false
+gobblin.trash.skip.trash=true
+state.store.enabled=false
+job.commit.parallelize=true

Reply via email to