PHOENIX-2334 CSV Bulk load fails on local indexes(Rajeshbabu)

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/decbfe30
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/decbfe30
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/decbfe30

Branch: refs/heads/calcite
Commit: decbfe3062bbc970050e03fbb198e61a2d30e88c
Parents: c48fee0
Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org>
Authored: Thu Feb 11 02:48:05 2016 +0530
Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org>
Committed: Thu Feb 11 02:48:05 2016 +0530

----------------------------------------------------------------------
 .../phoenix/end2end/CsvBulkLoadToolIT.java      | 27 +++++++++++---------
 .../phoenix/mapreduce/AbstractBulkLoadTool.java | 16 ++++++++----
 .../mapreduce/bulkload/TargetTableRef.java      |  2 +-
 .../phoenix/query/ConnectionQueryServices.java  |  1 +
 .../query/ConnectionQueryServicesImpl.java      | 27 ++++++++++++++++++++
 .../query/ConnectionlessQueryServicesImpl.java  | 15 +++++++++++
 .../query/DelegateConnectionQueryServices.java  |  6 +++++
 .../java/org/apache/phoenix/util/IndexUtil.java | 10 +++++++-
 8 files changed, 85 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 26ec889..96042c5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -218,7 +218,7 @@ public class CsvBulkLoadToolIT extends 
BaseOwnClusterHBaseManagedTimeIT {
 
         Statement stmt = conn.createStatement();
         stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " +
-                "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+                "FIRST_NAME VARCHAR, LAST_NAME VARCHAR) SPLIt ON (1,2)");
         String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 "
                 + " (FIRST_NAME ASC)";
         stmt.execute(ddl);
@@ -234,16 +234,19 @@ public class CsvBulkLoadToolIT extends 
BaseOwnClusterHBaseManagedTimeIT {
 
         CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
         csvBulkLoadTool.setConf(getUtility().getConfiguration());
-        try {
-            csvBulkLoadTool.run(new String[] {
-                    "--input", "/tmp/input3.csv",
-                    "--table", "table6",
-                    "--zookeeper", zkQuorum});
-            fail("Csv bulk load currently has issues with local indexes.");
-        } catch( UnsupportedOperationException ise) {
-            assertEquals("Local indexes not supported by Bulk 
Loader",ise.getMessage());
-        }
-        
+        int exitCode = csvBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input3.csv",
+                "--table", "table6",
+                "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE6 
where first_name='FirstName 2'");
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("FirstName 2", rs.getString(2));
+
+        rs.close();
+        stmt.close();
     }
 
     @Test
@@ -251,7 +254,7 @@ public class CsvBulkLoadToolIT extends 
BaseOwnClusterHBaseManagedTimeIT {
         testImportOneIndexTable("TABLE4", false);
     }
 
-    //@Test
+    @Test
     public void testImportOneLocalIndexTable() throws Exception {
         testImportOneIndexTable("TABLE5", true);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index f6ba5f6..39ee4b1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -21,8 +21,10 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.commons.cli.CommandLine;
@@ -54,6 +56,7 @@ import 
org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -295,7 +298,12 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
     }
 
     private void completebulkload(Configuration conf,Path outputPath , 
List<TargetTableRef> tablesToBeLoaded) throws Exception {
+        Set<String> tableNames = new HashSet<>(tablesToBeLoaded.size());
         for(TargetTableRef table : tablesToBeLoaded) {
+            if(tableNames.contains(table.getPhysicalName())){
+                continue;
+            }
+            tableNames.add(table.getPhysicalName());
             LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
             String tableName = table.getPhysicalName();
             Path tableOutputPath = new Path(outputPath,tableName);
@@ -382,11 +390,9 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
         List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
         for(PTable indexTable : table.getIndexes()){
             if (indexTable.getIndexType() == PTable.IndexType.LOCAL) {
-                throw new UnsupportedOperationException("Local indexes not 
supported by Bulk Loader");
-                /*indexTables.add(
-                        new TargetTableRef(getQualifiedTableName(schemaName,
-                                indexTable.getTableName().getString()),
-                                
MetaDataUtil.getLocalIndexTableName(qualifiedTableName))); */
+                indexTables.add(new 
TargetTableRef(getQualifiedTableName(schemaName, indexTable
+                        .getTableName().getString()), MetaDataUtil
+                        .getLocalIndexTableName(qualifiedTableName)));
             } else {
                 indexTables.add(new 
TargetTableRef(getQualifiedTableName(schemaName,
                         indexTable.getTableName().getString())));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
index 1a846f9..2c3069f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
@@ -46,7 +46,7 @@ public class TargetTableRef {
     }
 
     @JsonCreator
-    private TargetTableRef(@JsonProperty("logicalName") String logicalName,
+    public TargetTableRef(@JsonProperty("logicalName") String logicalName,
         @JsonProperty("physicalName") String physicalName) {
         this.logicalName = logicalName;
         this.physicalName = physicalName;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index 25d7ff4..b5f1f85 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -69,6 +69,7 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
 
     public HTableDescriptor getTableDescriptor(byte[] tableName) throws 
SQLException;
 
+    public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] 
row) throws SQLException;
     public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws 
SQLException;
 
     public PhoenixConnection connect(String url, Properties info) throws 
SQLException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index b29e3d9..897c207 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -3368,4 +3368,31 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     public boolean isRenewingLeasesEnabled() {
         return supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE) && 
renewLeaseEnabled;
     }
+
+
+    @Override
+    public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] 
row) throws SQLException {
+       /*
+        * Use HConnection.getRegionLocation as it uses the cache in 
HConnection, to get the region
+        * to which specified row belongs to.
+         */
+        int retryCount = 0, maxRetryCount = 1;
+        boolean reload =false;
+        while (true) {
+                try {
+                        return 
connection.getRegionLocation(TableName.valueOf(tableName), row, reload);
+                } catch (org.apache.hadoop.hbase.TableNotFoundException e) {
+                        String fullName = Bytes.toString(tableName);
+                        throw new 
TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(fullName), 
SchemaUtil.getTableNameFromFullName(fullName));
+                } catch (IOException e) {
+                        if (retryCount++ < maxRetryCount) { // One retry, in 
case split occurs while navigating
+                                reload = true;
+                                continue;
+                        }
+                        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.GET_TABLE_REGIONS_FAIL)
+                        .setRootCause(e).build().buildException();
+                }
+        }
+     }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 6cfb382..b4bbe1f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -577,4 +577,19 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
     public boolean isRenewingLeasesEnabled() {
         return false;
     }
+
+    public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] 
row) throws SQLException {
+       List<HRegionLocation> regions = 
tableSplits.get(Bytes.toString(tableName));
+       if (regions != null) {
+               for(HRegionLocation region: regions) {
+                       if 
(Bytes.compareTo(region.getRegionInfo().getStartKey(), row) <= 0
+                                       && 
Bytes.compareTo(region.getRegionInfo().getEndKey(), row) > 0) {
+                           return region;
+                       }
+               }
+       }
+       return new HRegionLocation(
+                       new HRegionInfo(TableName.valueOf(tableName), 
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW),
+                       SERVER_NAME, -1);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 9b721f8..4c7446b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -304,4 +304,10 @@ public class DelegateConnectionQueryServices extends 
DelegateQueryServices imple
     public boolean isRenewingLeasesEnabled() {
         return getDelegate().isRenewingLeasesEnabled();
     }
+
+    @Override
+    public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row)
+            throws SQLException {
+        return getDelegate().getTableRegionLocation(tableName, row);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index f361fb9..98b88f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
@@ -283,7 +284,14 @@ public class IndexUtil {
                         }
                         
                     };
-                    
indexMutations.add(maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, 
ts, null, null));
+                    byte[] regionStartKey = null;
+                    byte[] regionEndkey = null;
+                    if(maintainer.isLocalIndex()) {
+                        HRegionLocation tableRegionLocation = 
connection.getQueryServices().getTableRegionLocation(table.getName().getBytes(),
 dataMutation.getRow());
+                        regionStartKey = 
tableRegionLocation.getRegionInfo().getStartKey();
+                        regionEndkey = 
tableRegionLocation.getRegionInfo().getEndKey();
+                    }
+                    
indexMutations.add(maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, 
ts, regionStartKey, regionEndkey));
                 }
             }
             return indexMutations;

Reply via email to