This is an automated email from the ASF dual-hosted git repository.
kuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2801321 [GOBBLIN-1216] Embedded Hive Distcp
2801321 is described below
commit 28013218552657c7933cc61547657be3caf90d73
Author: Lei Sun <[email protected]>
AuthorDate: Tue Jul 21 11:29:21 2020 -0700
[GOBBLIN-1216] Embedded Hive Distcp
Embedded Hive Distcp
Address comments
Closes #3064 from autumnust/embeddedHiveDistcp-new
---
.../runtime/embedded/EmbeddedGobblinDistcp.java | 8 +-
.../embedded/EmbeddedGobblinDistcpTest.java | 119 +++++++++++++++++++--
.../src/test/resources/hive-site.xml | 14 +++
.../gobblin/runtime/embedded/EmbeddedGobblin.java | 6 +-
.../main/resources/templates/hiveDistcp.template | 96 +++++++++++++++++
5 files changed, 230 insertions(+), 13 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
index f7076bd..f61a9fa 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcp.java
@@ -68,10 +68,16 @@ public class EmbeddedGobblinDistcp extends EmbeddedGobblin {
}
}
+ // For backward-compatibility, default to distcp.template
public EmbeddedGobblinDistcp(Path from, Path to) throws
JobTemplate.TemplateException, IOException {
+ this("templates/distcp.template", from, to);
+ }
+
+ // An interface to load specified template.
+ public EmbeddedGobblinDistcp(String templateLoc, Path from, Path to) throws
JobTemplate.TemplateException, IOException {
super("Distcp");
try {
-
setTemplate(ResourceBasedJobTemplate.forResourcePath("templates/distcp.template"));
+ setTemplate(ResourceBasedJobTemplate.forResourcePath(templateLoc));
} catch (URISyntaxException | SpecNotFoundException exc) {
throw new RuntimeException("Could not instantiate an " +
EmbeddedGobblinDistcp.class.getName(), exc);
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index 18da5e8..a54ba95 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -21,28 +21,29 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.converter.GobblinMetricsPinotFlattenerConverter;
-import org.apache.gobblin.data.management.copy.CopyConfiguration;
-import org.apache.gobblin.data.management.copy.CopySource;
-import org.apache.gobblin.data.management.copy.SchemaCheckedCopySource;
-import org.apache.gobblin.runtime.api.JobExecutionResult;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
-
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.api.client.util.Charsets;
@@ -50,8 +51,40 @@ import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.typesafe.config.Config;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.converter.GobblinMetricsPinotFlattenerConverter;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.SchemaCheckedCopySource;
+import org.apache.gobblin.runtime.api.JobExecutionResult;
+import org.apache.gobblin.util.HiveJdbcConnector;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
+
public class EmbeddedGobblinDistcpTest {
+ private HiveJdbcConnector jdbcConnector;
+ private IMetaStoreClient metaStoreClient;
+ private static final String TEST_DB = "testdb";
+ private static final String TEST_TABLE = "test_table";
+ private static final String TARGET_PATH = "/tmp/target";
+ private static final String TARGET_DB = "target";
+
+ @BeforeClass
+ public void setup() throws Exception {
+ try {
+ HiveConf hiveConf = new HiveConf();
+ // Start a Hive session in this thread and register the UDF
+ SessionState.start(hiveConf);
+ SessionState.get().initTxnMgr(hiveConf);
+ metaStoreClient = new HiveMetaStoreClient(new HiveConf());
+ jdbcConnector = HiveJdbcConnector.newEmbeddedConnector(2);
+ } catch (HiveException he) {
+ throw new RuntimeException("Failed to start Hive session.", he);
+ } catch (SQLException se) {
+ throw new RuntimeException("Cannot initialize the jdbc-connector due to:
", se);
+ }
+ }
@Test
public void test() throws Exception {
@@ -84,6 +117,74 @@ public class EmbeddedGobblinDistcpTest {
}
@Test
+ public void hiveTest() throws Exception {
+ Statement statement = jdbcConnector.getConnection().createStatement();
+
+ // Start from a fresh Hive backup: No DB, no table.
+ // Create a DB.
+ statement.execute("CREATE database if not exists " + TEST_DB);
+
+ // Create a table.
+ String tableCreationSQL = "CREATE TABLE IF NOT EXISTS $testdb.$test_table
(id int, name String)\n" + "ROW FORMAT DELIMITED\n"
+ + "FIELDS TERMINATED BY '\\t'\n" + "LINES TERMINATED BY '\\n'\n" +
"STORED AS TEXTFILE";
+
statement.execute(tableCreationSQL.replace("$testdb",TEST_DB).replace("$test_table",
TEST_TABLE));
+
+ // Insert data
+ String dataInsertionSQL = "INSERT INTO TABLE $testdb.$test_table VALUES
(1, 'one'), (2, 'two'), (3, 'three')";
+
statement.execute(dataInsertionSQL.replace("$testdb",TEST_DB).replace("$test_table",
TEST_TABLE));
+ String templateLoc = "templates/hiveDistcp.template";
+
+ // Either of the "from" or "to" will be used here since it is a Hive
Distcp.
+ EmbeddedGobblinDistcp embeddedHiveDistcp =
+ new EmbeddedGobblinDistcp(templateLoc, new Path("a"), new Path("b"));
+ embeddedHiveDistcp.setConfiguration("hive.dataset.copy.target.database",
TARGET_DB);
+
embeddedHiveDistcp.setConfiguration("hive.dataset.copy.target.table.prefixReplacement",
TARGET_PATH);
+
+ String dbPathTemplate = "/$testdb.db/$test_table";
+ String rootPathOfSourceDate =
metaStoreClient.getConfigValue("hive.metastore.warehouse.dir", "")
+ .concat(dbPathTemplate.replace("$testdb",
TEST_DB).replace("$test_table",TEST_TABLE)
+ );
+
embeddedHiveDistcp.setConfiguration("hive.dataset.copy.target.table.prefixToBeReplaced",
rootPathOfSourceDate);
+ embeddedHiveDistcp.run();
+
+ // Verify the table is existed in the target and file exists in the target
location.
+ metaStoreClient.tableExists(TARGET_DB, TEST_TABLE);
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ fs.exists(new Path(TARGET_PATH));
+ }
+
+ // Tearing down the Hive components from derby driver if there's anything
generated through the test.
+ @AfterClass
+ public void hiveTearDown() throws Exception {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path targetPath = new Path(TARGET_PATH);
+ if (fs.exists(targetPath)) {
+ fs.delete(targetPath, true);
+ }
+
+ if (metaStoreClient != null) {
+ // Clean the source table and DB
+ if (metaStoreClient.tableExists(TEST_DB, TEST_TABLE)) {
+ metaStoreClient.dropTable(TEST_DB, TEST_TABLE);
+ }
+ if (metaStoreClient.getAllDatabases().contains(TEST_DB)) {
+ metaStoreClient.dropDatabase(TEST_DB);
+ }
+
+ // Clean the target table and DB
+ if (metaStoreClient.tableExists("target", TEST_TABLE)) {
+ metaStoreClient.dropTable("target", TEST_TABLE, true, true);
+ }
+ if (metaStoreClient.getAllDatabases().contains(TARGET_DB)) {
+ metaStoreClient.dropDatabase(TARGET_DB);
+ }
+ metaStoreClient.close();
+ }
+
+ jdbcConnector.close();
+ }
+
+ @Test
public void testCheckSchema() throws Exception {
Schema schema = null;
try (InputStream is =
GobblinMetricsPinotFlattenerConverter.class.getClassLoader().getResourceAsStream("avroSchemaManagerTest/expectedSchema.avsc"))
{
diff --git a/gobblin-data-management/src/test/resources/hive-site.xml
b/gobblin-data-management/src/test/resources/hive-site.xml
new file mode 100644
index 0000000..ae1b4cd
--- /dev/null
+++ b/gobblin-data-management/src/test/resources/hive-site.xml
@@ -0,0 +1,14 @@
+<configuration>
+ <property>
+ <name>javax.jdo.option.ConnectionDriverName</name>
+ <value>org.apache.derby.jdbc.EmbeddedDriver</value>
+ </property>
+ <property>
+ <name>javax.jdo.option.ConnectionURL</name>
+
<value>jdbc:derby:;databaseName=/tmp/scratch/hive/metastore_db;create=true</value>
+ </property>
+ <property>
+ <name>hive.metastore.warehouse.dir</name>
+ <value>/tmp/scratch/hive/warehouse</value>
+ </property>
+</configuration>
\ No newline at end of file
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
index 1a1e858..d723ff8 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
@@ -142,9 +142,9 @@ public class EmbeddedGobblin {
private Runnable distributeJarsFunction;
private JobTemplate template;
private Logger useLog = log;
- private FullTimeout launchTimeout = new FullTimeout(10, TimeUnit.SECONDS);
- private FullTimeout jobTimeout = new FullTimeout(10, TimeUnit.DAYS);
- private FullTimeout shutdownTimeout = new FullTimeout(10, TimeUnit.SECONDS);
+ private FullTimeout launchTimeout = new FullTimeout(100, TimeUnit.SECONDS);
+ private FullTimeout jobTimeout = new FullTimeout(100, TimeUnit.DAYS);
+ private FullTimeout shutdownTimeout = new FullTimeout(100, TimeUnit.SECONDS);
private boolean dumpJStackOnTimeout = false;
private List<GobblinInstancePluginFactory> plugins = Lists.newArrayList();
private Optional<Path> jobFile = Optional.absent();
diff --git a/gobblin-runtime/src/main/resources/templates/hiveDistcp.template
b/gobblin-runtime/src/main/resources/templates/hiveDistcp.template
new file mode 100644
index 0000000..d85144d
--- /dev/null
+++ b/gobblin-runtime/src/main/resources/templates/hiveDistcp.template
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# ====================================================================
+# Job configurations (can be changed)
+# ====================================================================
+
+# General job metadata
+job.group=DistcpHive
+job.name=EmbeddedDistcpHiveTracking
+job.description=Embedded Hive-Distcp-ng toy job
+
+// URI comes from hiveConf, don't have to fill anything here.
+hive.metastore.uri=""
+hive.db.root.dir=/tmp
+
+# Source and target metastores
+hive.dataset.hive.metastore.uri=${hive.metastore.uri}
+hive.dataset.copy.target.metastore.uri=${hive.metastore.uri}
+gobblin.dataset.profile.class=org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder
+
+# Database and tables copy
+hive.dataset.whitelist=testdb.test_table
+#hive.dataset.copy.target.database=target
+
+# Conflicting table and partitions treatment
+hive.dataset.existing.entity.conflict.policy=REPLACE_PARTITIONS
+
+# What to do with the files when deregistering a partition
+hive.dataset.copy.deregister.fileDeleteMethod=NO_DELETE
+
+# Which files to copy when copying a partition:
+# INPUT_FORMAT -> use input format to find files, RECURSIVE -> copy all files
under partition location
+hive.dataset.copy.location.listing.method=RECURSIVE
+# Should skip hidden paths when copying
+hive.dataset.copy.locations.listing.skipHiddenPaths=true
+
+# Use registration time (Hive parameter) to determine whether a partition
should be skipped
+hive.dataset.copy.fast.partition.skip.predicate=org.apache.gobblin.data.management.copy.predicates.RegistrationTimeSkipPredicate
+
+# Partition filter
+hive.dataset.copy.partition.filter.generator=org.apache.gobblin.data.management.copy.hive.filter.LookbackPartitionFilterGenerator
+hive.dataset.partition.filter.datetime.column=datepartition
+hive.dataset.partition.filter.datetime.lookback=P7D
+hive.dataset.partition.filter.datetime.format=YYYY-MM-dd-HH
+
+# Preserve attributes
+gobblin.copy.preserved.attributes=rgbp
+
+# Simulate?
+gobblin.copy.simulate=false
+
+# Bin packing
+# 250 MB
+gobblin.copy.binPacking.maxSizePerBin=250000000
+
+# Trash
+gobblin.trash.location=/data/trash/tracking/_GOBBLIN_TRASH
+
+# target location for copy
+data.publisher.final.dir=${hive.dataset.copy.target.table.prefixReplacement}
+
+# ====================================================================
+# Distcp configurations
+# ====================================================================
+
+extract.namespace=org.apache.gobblin.copy
+data.publisher.type=org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher
+source.class=org.apache.gobblin.data.management.copy.CopySource
+writer.builder.class=org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder
+converter.classes=org.apache.gobblin.converter.IdentityConverter
+
+task.maxretries=0
+workunit.retry.enabled=false
+user.defined.staging.dir.flag=false
+
+distcp.persist.dir=/tmp/distcp-persist-dir
+
+cleanup.staging.data.per.task=false
+gobblin.trash.skip.trash=true
+state.store.enabled=false
+job.commit.parallelize=true