Fix PhoenixMRJobSubmitter submits duplicate MR jobs for an index build from 
indexTool


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/670e14fc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/670e14fc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/670e14fc

Branch: refs/heads/4.x-cdh5.13
Commit: 670e14fc109d8580e88adf9f97b725ef32042f88
Parents: 3768da3
Author: Xu Cang <xc...@salesforce.com>
Authored: Tue May 8 00:21:46 2018 +0100
Committer: Pedro Boado <pbo...@apache.org>
Committed: Sun May 13 10:45:44 2018 +0100

----------------------------------------------------------------------
 .../phoenix/mapreduce/index/IndexTool.java      |  6 ++--
 .../index/automation/PhoenixAsyncIndex.java     |  2 +-
 .../index/automation/PhoenixMRJobSubmitter.java |  2 +-
 .../index/automated/MRJobSubmitterTest.java     | 30 +++++++++++++++-----
 4 files changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/670e14fc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 671e4cf..e3aa729 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -123,7 +123,7 @@ public class IndexTool extends Configured implements Tool {
     private static final Option SNAPSHOT_OPTION = new Option("snap", 
"snapshot", false,
         "If specified, uses Snapshots for async index building (optional)");
     private static final Option HELP_OPTION = new Option("h", "help", false, 
"Help");
-    public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s";
+    public static final String INDEX_JOB_NAME_TEMPLATE = 
"PHOENIX_%s.%s_INDX_%s";
 
     private Options getOptions() {
         final Options options = new Options();
@@ -373,9 +373,9 @@ public class IndexTool extends Configured implements Tool {
                     PhoenixRuntime.generateColumnInfo(connection, qIndexTable, 
indexColumns);
             ColumnInfoToStringEncoderDecoder.encode(configuration, 
columnMetadataList);
             fs = outputPath.getFileSystem(configuration);
-            fs.delete(outputPath, true);           
+            fs.delete(outputPath, true);
  
-            final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, 
pdataTable.getName().toString(), indexTable);
+            final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, 
schemaName, dataTable, indexTable);
             final Job job = Job.getInstance(configuration, jobName);
             job.setJarByClass(IndexTool.class);
             job.setMapOutputKeyClass(ImmutableBytesWritable.class);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670e14fc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java
index 3e88cd0..a61e49a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java
@@ -59,7 +59,7 @@ public class PhoenixAsyncIndex {
     }
 
     public String getJobName() {
-        return String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, dataTableName, 
tableName);
+        return String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, tableSchem, 
dataTableName, tableName);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670e14fc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
index d86802a..3e20bd2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
@@ -216,7 +216,7 @@ public class PhoenixMRJobSubmitter {
             
indexInfo.setTableSchem(rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM));
             
indexInfo.setTableName(rs.getString(PhoenixDatabaseMetaData.TABLE_NAME));
             
candidateIndexes.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
-                indexInfo.getDataTableName(), indexInfo.getTableName()), 
indexInfo);
+                indexInfo.getTableSchem(), indexInfo.getDataTableName(), 
indexInfo.getTableName()), indexInfo);
         }
 
         return candidateIndexes;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/670e14fc/phoenix-core/src/test/java/org/apache/phoenix/index/automated/MRJobSubmitterTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/index/automated/MRJobSubmitterTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/index/automated/MRJobSubmitterTest.java
index 5ed1837..3a4de4c 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/index/automated/MRJobSubmitterTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/index/automated/MRJobSubmitterTest.java
@@ -43,18 +43,20 @@ public class MRJobSubmitterTest {
         PhoenixAsyncIndex index1 = new PhoenixAsyncIndex();
         index1.setDataTableName("DT1");
         index1.setTableName("IT1");
+        index1.setTableSchem("NEW_SCHEM1");
         index1.setIndexType(IndexType.LOCAL);
 
         candidateJobs.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
-            index1.getDataTableName(), index1.getTableName()), index1);
+            index1.getTableSchem(), index1.getDataTableName(), 
index1.getTableName()), index1);
 
         PhoenixAsyncIndex index2 = new PhoenixAsyncIndex();
         index2.setDataTableName("DT2");
         index2.setTableName("IT2");
+        index2.setTableSchem("NEW_SCHEM2");
         index2.setIndexType(IndexType.LOCAL);
 
         candidateJobs.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
-            index2.getDataTableName(), index2.getTableName()), index2);
+            index2.getTableSchem(), index2.getDataTableName(), 
index2.getTableName()), index2);
     }
 
     @Test
@@ -71,6 +73,20 @@ public class MRJobSubmitterTest {
     }
 
     @Test
+    public void testIndexJobsName() throws IOException {
+        // Verify index job name contains schem name, not only table name.
+        PhoenixAsyncIndex index = new PhoenixAsyncIndex();
+        index.setDataTableName("MyDataTable");
+        index.setTableName("MyTableName");
+        index.setTableSchem("MySchem");
+        index.setIndexType(IndexType.LOCAL);
+
+        String jobName = String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+                index.getTableSchem(), index.getDataTableName(), 
index.getTableName());
+        assertEquals("PHOENIX_MySchem.MyDataTable_INDX_MyTableName", jobName);
+    }
+
+    @Test
     public void testGlobalIndexJobsForSubmission() throws IOException {
 
         // Set the index type to GLOBAL
@@ -91,7 +107,7 @@ public class MRJobSubmitterTest {
         
         // Mark one job as running
         submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
-            jobs[0].getDataTableName(), jobs[0].getTableName()));
+            jobs[0].getTableSchem(), jobs[0].getDataTableName(), 
jobs[0].getTableName()));
 
         PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter();
         Set<PhoenixAsyncIndex> jobsToSubmit =
@@ -110,9 +126,9 @@ public class MRJobSubmitterTest {
         
         // Mark all the candidate jobs as running/in-progress
         submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
-            jobs[0].getDataTableName(), jobs[0].getTableName()));
+                jobs[0].getTableSchem(), jobs[0].getDataTableName(), 
jobs[0].getTableName()));
         submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
-            jobs[1].getDataTableName(), jobs[1].getTableName()));
+                jobs[1].getTableSchem(), jobs[1].getDataTableName(), 
jobs[1].getTableName()));
 
         PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter();
         Set<PhoenixAsyncIndex> jobsToSubmit =
@@ -126,9 +142,9 @@ public class MRJobSubmitterTest {
         candidateJobs.clear();
         // Add some dummy running jobs to the submitted list
         submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
-            "d1", "i1"));
+            "s1", "d1", "i1"));
         submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
-            "d2", "i2"));
+            "s2", "d2", "i2"));
         PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter();
         Set<PhoenixAsyncIndex> jobsToSubmit =
                 submitter.getJobsToSubmit(candidateJobs, submittedJobs);

Reply via email to