Repository: hive
Updated Branches:
  refs/heads/master 0dbb896cf -> b3ef75eaa


http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index f05c231..dac20d2 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.parse.repl.dump.io;
 
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
@@ -29,12 +28,16 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TJSONProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Map;
 
 public class TableSerializer implements JsonWriter.Serializer {
   public static final String FIELD_NAME = "table";
+  private static final Logger LOG = 
LoggerFactory.getLogger(TableSerializer.class);
+
   private final org.apache.hadoop.hive.ql.metadata.Table tableHandle;
   private final Iterable<Partition> partitions;
   private final HiveConf hiveConf;
@@ -53,8 +56,9 @@ public class TableSerializer implements JsonWriter.Serializer 
{
       return;
     }
 
-    Table tTable = tableHandle.getTTable();
-    tTable = updatePropertiesInTable(tTable, additionalPropertiesProvider);
+    Table tTable = updatePropertiesInTable(
+        tableHandle.getTTable(), additionalPropertiesProvider
+    );
     try {
       TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
       writer.jsonGenerator
@@ -83,14 +87,6 @@ public class TableSerializer implements 
JsonWriter.Serializer {
                 ReplicationSpec.KEY.CURR_STATE_ID.toString(),
                 additionalPropertiesProvider.getCurrentReplicationState());
       }
-      if (isExternalTable(table)) {
-          // Replication destination will not be external - override if set
-        table.putToParameters("EXTERNAL", "FALSE");
-      }
-      if (isExternalTableType(table)) {
-          // Replication dest will not be external - override if set
-        table.setTableType(TableType.MANAGED_TABLE.toString());
-      }
     } else {
       // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE;
       // write(out, ",\""+ scopeKey.toString() +"\":\"" + 
replicationSpec.get(scopeKey) + "\"");
@@ -101,17 +97,6 @@ public class TableSerializer implements 
JsonWriter.Serializer {
     return table;
   }
 
-  private boolean 
isExternalTableType(org.apache.hadoop.hive.metastore.api.Table table) {
-    return table.isSetTableType()
-        && 
table.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());
-  }
-
-  private boolean isExternalTable(org.apache.hadoop.hive.metastore.api.Table 
table) {
-    Map<String, String> params = table.getParameters();
-    return params.containsKey("EXTERNAL")
-        && params.get("EXTERNAL").equalsIgnoreCase("TRUE");
-  }
-
   private void writePartitions(JsonWriter writer, ReplicationSpec 
additionalPropertiesProvider)
       throws SemanticException, IOException {
     writer.jsonGenerator.writeStartArray();

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
index 9907133..b04fdef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/MetadataJson.java
@@ -36,7 +36,6 @@ import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
-import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -99,14 +98,7 @@ public class MetadataJson {
   }
 
   private ReplicationSpec readReplicationSpec() {
-    com.google.common.base.Function<String, String> keyFetcher =
-        new com.google.common.base.Function<String, String>() {
-          @Override
-          public String apply(@Nullable String s) {
-            return jsonEntry(s);
-          }
-        };
-    return new ReplicationSpec(keyFetcher);
+    return new ReplicationSpec(this::jsonEntry);
   }
 
   private void checkCompatibility() throws SemanticException, JSONException {

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
index d412fd7..fe89ab2 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/InsertHandler.java
@@ -17,17 +17,41 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 
 public class InsertHandler extends AbstractMessageHandler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(InsertHandler.class);
+
   @Override
   public List<Task<? extends Serializable>> handle(Context withinContext)
       throws SemanticException {
+    try {
+      FileSystem fs =
+          FileSystem.get(new Path(withinContext.location).toUri(), 
withinContext.hiveConf);
+      MetaData metaData =
+          EximUtil.readMetaData(fs, new Path(withinContext.location, 
EximUtil.METADATA_NAME));
+      ReplicationSpec replicationSpec = metaData.getReplicationSpec();
+      if (replicationSpec.isNoop()) {
+        return Collections.emptyList();
+      }
+    } catch (Exception e) {
+      LOG.error("failed to load insert event", e);
+      throw new SemanticException(e);
+    }
+
     InsertMessage insertMessage = 
deserializer.getInsertMessage(withinContext.dmd.getPayload());
     String actualDbName =
         withinContext.isDbNameEmpty() ? insertMessage.getDB() : 
withinContext.dbName;

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
index cdf51dd..4ae4894 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
@@ -89,6 +89,10 @@ public interface MessageHandler {
       return StringUtils.isEmpty(dbName);
     }
 
+    /**
+     * not sure why we have this, this should always be read from the 
_metadata file via the
+     * {@link 
org.apache.hadoop.hive.ql.parse.repl.load.MetadataJson#readReplicationSpec}
+     */
     ReplicationSpec eventOnlyReplicationSpec() throws SemanticException {
       String eventId = dmd.getEventTo().toString();
       return new ReplicationSpec(eventId, eventId);

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
index f5f4459..56c2abe 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
@@ -17,36 +17,56 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.net.URI;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
-import static 
org.apache.hadoop.hive.ql.parse.repl.DumpType.EVENT_ALTER_PARTITION;
-import static org.apache.hadoop.hive.ql.parse.repl.DumpType.EVENT_ALTER_TABLE;
-
 public class TableHandler extends AbstractMessageHandler {
+  private static final long DEFAULT_WRITE_ID = 0L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(TableHandler.class);
+
   @Override
   public List<Task<? extends Serializable>> handle(Context context) throws 
SemanticException {
     try {
       List<Task<? extends Serializable>> importTasks = new ArrayList<>();
-      long writeId = 0;
+      boolean isExternal = false, isLocationSet = false;
+      String parsedLocation = null;
 
-      if (context.dmd.getDumpType().equals(EVENT_ALTER_TABLE)) {
-        AlterTableMessage message = 
deserializer.getAlterTableMessage(context.dmd.getPayload());
-        writeId = message.getWriteId();
-      } else if (context.dmd.getDumpType().equals(EVENT_ALTER_PARTITION)) {
-        AlterPartitionMessage message = 
deserializer.getAlterPartitionMessage(context.dmd.getPayload());
-        writeId = message.getWriteId();
+      DumpType eventType = context.dmd.getDumpType();
+      Tuple tuple = extract(context);
+      if (tuple.isExternalTable) {
+        URI fromURI = EximUtil.getValidatedURI(context.hiveConf, 
context.location);
+        Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), 
fromURI.getPath());
+        isLocationSet = true;
+        isExternal = true;
+        FileSystem fs = FileSystem.get(fromURI, context.hiveConf);
+        try {
+          MetaData rv = EximUtil.readMetaData(fs, new Path(fromPath, 
EximUtil.METADATA_NAME));
+          Table table = new Table(rv.getTable());
+          parsedLocation = ReplExternalTables
+              .externalTableLocation(context.hiveConf, 
table.getSd().getLocation());
+        } catch (IOException e) {
+          throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+        }
       }
 
       context.nestedContext.setConf(context.hiveConf);
@@ -54,13 +74,13 @@ public class TableHandler extends AbstractMessageHandler {
           new EximUtil.SemanticAnalyzerWrapperContext(
               context.hiveConf, context.db, readEntitySet, writeEntitySet, 
importTasks, context.log,
               context.nestedContext);
-      x.setEventType(context.dmd.getDumpType());
+      x.setEventType(eventType);
 
       // REPL LOAD is not partition level. It is always DB or table level. So, 
passing null for partition specs.
       // Also, REPL LOAD doesn't support external table and hence no location 
set as well.
-      ImportSemanticAnalyzer.prepareImport(false, false, false, false,
-          (context.precursor != null), null, context.tableName, context.dbName,
-          null, context.location, x, updatedMetadata, context.getTxnMgr(), 
writeId);
+      ImportSemanticAnalyzer.prepareImport(false, isLocationSet, isExternal, 
false,
+          (context.precursor != null), parsedLocation, context.tableName, 
context.dbName,
+          null, context.location, x, updatedMetadata, context.getTxnMgr(), 
tuple.writeId);
 
       Task<? extends Serializable> openTxnTask = x.getOpenTxnTask();
       if (openTxnTask != null && !importTasks.isEmpty()) {
@@ -71,8 +91,57 @@ public class TableHandler extends AbstractMessageHandler {
       }
 
       return importTasks;
+    } catch (RuntimeException e){
+      throw e;
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  private Tuple extract(Context context) throws SemanticException {
+    try {
+      String tableType = null;
+      long writeId = DEFAULT_WRITE_ID;
+      switch (context.dmd.getDumpType()) {
+      case EVENT_CREATE_TABLE:
+      case EVENT_ADD_PARTITION:
+        Path metadataPath = new Path(context.location, EximUtil.METADATA_NAME);
+        MetaData rv = EximUtil.readMetaData(
+            metadataPath.getFileSystem(context.hiveConf),
+            metadataPath
+        );
+        tableType = rv.getTable().getTableType();
+        break;
+      case EVENT_ALTER_TABLE:
+        AlterTableMessage alterTableMessage =
+            deserializer.getAlterTableMessage(context.dmd.getPayload());
+        tableType = alterTableMessage.getTableObjAfter().getTableType();
+        writeId = alterTableMessage.getWriteId();
+        break;
+      case EVENT_ALTER_PARTITION:
+        AlterPartitionMessage msg = 
deserializer.getAlterPartitionMessage(context.dmd.getPayload());
+        tableType = msg.getTableObj().getTableType();
+        writeId = msg.getWriteId();
+        break;
+      default:
+        break;
+      }
+      boolean isExternalTable = tableType != null
+          && TableType.EXTERNAL_TABLE.equals(Enum.valueOf(TableType.class, 
tableType));
+      return new Tuple(isExternalTable, writeId);
     } catch (Exception e) {
+      LOG.error("failed to determine if the table associated with the event is 
external or not", e);
       throw new SemanticException(e);
     }
   }
+
+  private static final class Tuple {
+    private final boolean isExternalTable;
+    private final long writeId;
+
+    private Tuple(boolean isExternalTable, long writeId) {
+      this.isExternalTable = isExternalTable;
+      this.writeId = writeId;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
index 9e5c071..39f342f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -44,9 +46,11 @@ import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.powermock.api.mockito.PowerMockito.verifyStatic;
 import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ Utils.class })
+@PrepareForTest({ Utils.class, ReplDumpTask.class})
 @PowerMockIgnore({ "javax.management.*" })
 public class TestReplDumpTask {
 
@@ -111,12 +115,17 @@ public class TestReplDumpTask {
     when(hive.getAllFunctions()).thenReturn(Collections.emptyList());
     when(queryState.getConf()).thenReturn(conf);
     when(conf.getLong("hive.repl.last.repl.id", -1L)).thenReturn(1L);
+    
when(conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)).thenReturn(false);
+
+    whenNew(Writer.class).withAnyArguments().thenReturn(mock(Writer.class));
+    
whenNew(HiveWrapper.class).withAnyArguments().thenReturn(mock(HiveWrapper.class));
 
     ReplDumpTask task = new StubReplDumpTask() {
       private int tableDumpCount = 0;
 
       @Override
-      void dumpTable(String dbName, String tblName, String validTxnList, Path 
dbRoot, long lastReplId, Hive hiveDb)
+      void dumpTable(String dbName, String tblName, String validTxnList, Path 
dbRoot,
+          long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple)
           throws Exception {
         tableDumpCount++;
         if (tableDumpCount > 1) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q 
b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
index 5b75ca8..b9119a9 100644
--- a/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
+++ b/ql/src/test/queries/clientpositive/repl_2_exim_basic.q
@@ -68,7 +68,6 @@ show create table ext_t_imported;
 select * from ext_t_imported;
 
 -- should have repl.last.id
--- also - importing an external table replication export would turn the new 
table into a managed table
 import table ext_t_r_imported from 'ql/test/data/exports/ext_t_r';
 describe extended ext_t_imported;
 show table extended like ext_t_r_imported;

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out 
b/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
index 40b6ad7..950b5e4 100644
--- a/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
+++ b/ql/src/test/results/clientpositive/repl_2_exim_basic.q.out
@@ -411,7 +411,7 @@ PREHOOK: Input: default@ext_t_r_imported
 POSTHOOK: query: show create table ext_t_r_imported
 POSTHOOK: type: SHOW_CREATETABLE
 POSTHOOK: Input: default@ext_t_r_imported
-CREATE TABLE `ext_t_r_imported`(
+CREATE EXTERNAL TABLE `ext_t_r_imported`(
   `emp_id` int COMMENT 'employee id')
 PARTITIONED BY ( 
   `emp_country` string, 
@@ -425,7 +425,6 @@ OUTPUTFORMAT
 LOCATION
 #### A masked pattern was here ####
 TBLPROPERTIES (
-  'EXTERNAL'='FALSE', 
   'bucketing_version'='2', 
   'discover.partitions'='true', 
   'repl.last.id'='0', 

http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/testutils/ptest2/conf/deployed/master-mr2.properties
----------------------------------------------------------------------
diff --git a/testutils/ptest2/conf/deployed/master-mr2.properties 
b/testutils/ptest2/conf/deployed/master-mr2.properties
index 6f0056a..9166f4a 100644
--- a/testutils/ptest2/conf/deployed/master-mr2.properties
+++ b/testutils/ptest2/conf/deployed/master-mr2.properties
@@ -68,7 +68,7 @@ ut.service.batchSize=8
 
 unitTests.module.itests.hive-unit=itests.hive-unit
 ut.itests.hive-unit.batchSize=9
-ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez 
TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez 
TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr 
TestReplicationScenariosIncrementalLoadAcidTables 
TestReplIncrementalLoadAcidTablesWithJsonMessage TestReplicationScenarios 
TestReplWithJsonMessageFormat
+ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez 
TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez 
TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr 
TestReplicationScenariosIncrementalLoadAcidTables 
TestReplIncrementalLoadAcidTablesWithJsonMessage TestReplicationScenarios 
TestReplWithJsonMessageFormat TestReplWithJsonMessageFormat
 
 unitTests.module.itests.qtest=itests.qtest
 ut.itests.qtest.batchSize=9

Reply via email to