PHOENIX-2334 CSV Bulk load fails on local indexes(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/decbfe30 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/decbfe30 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/decbfe30 Branch: refs/heads/calcite Commit: decbfe3062bbc970050e03fbb198e61a2d30e88c Parents: c48fee0 Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Thu Feb 11 02:48:05 2016 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Thu Feb 11 02:48:05 2016 +0530 ---------------------------------------------------------------------- .../phoenix/end2end/CsvBulkLoadToolIT.java | 27 +++++++++++--------- .../phoenix/mapreduce/AbstractBulkLoadTool.java | 16 ++++++++---- .../mapreduce/bulkload/TargetTableRef.java | 2 +- .../phoenix/query/ConnectionQueryServices.java | 1 + .../query/ConnectionQueryServicesImpl.java | 27 ++++++++++++++++++++ .../query/ConnectionlessQueryServicesImpl.java | 15 +++++++++++ .../query/DelegateConnectionQueryServices.java | 6 +++++ .../java/org/apache/phoenix/util/IndexUtil.java | 10 +++++++- 8 files changed, 85 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java index 26ec889..96042c5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java @@ -218,7 +218,7 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT { Statement stmt = conn.createStatement(); stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " + - "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR) SPLIt ON (1,2)"); String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 " + " (FIRST_NAME ASC)"; stmt.execute(ddl); @@ -234,16 +234,19 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT { CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); csvBulkLoadTool.setConf(getUtility().getConfiguration()); - try { - csvBulkLoadTool.run(new String[] { - "--input", "/tmp/input3.csv", - "--table", "table6", - "--zookeeper", zkQuorum}); - fail("Csv bulk load currently has issues with local indexes."); - } catch( UnsupportedOperationException ise) { - assertEquals("Local indexes not supported by Bulk Loader",ise.getMessage()); - } - + int exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input3.csv", + "--table", "table6", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE6 where first_name='FirstName 2'"); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("FirstName 2", rs.getString(2)); + + rs.close(); + stmt.close(); } @Test @@ -251,7 +254,7 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT { testImportOneIndexTable("TABLE4", false); } - //@Test + @Test public void testImportOneLocalIndexTable() throws Exception { testImportOneIndexTable("TABLE5", true); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index f6ba5f6..39ee4b1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -21,8 +21,10 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.commons.cli.CommandLine; @@ -54,6 +56,7 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; @@ -295,7 +298,12 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { } private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception { + Set<String> tableNames = new HashSet<>(tablesToBeLoaded.size()); for(TargetTableRef table : tablesToBeLoaded) { + if(tableNames.contains(table.getPhysicalName())){ + continue; + } + tableNames.add(table.getPhysicalName()); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); String tableName = table.getPhysicalName(); Path tableOutputPath = new Path(outputPath,tableName); @@ -382,11 +390,9 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>(); for(PTable indexTable : table.getIndexes()){ if (indexTable.getIndexType() == PTable.IndexType.LOCAL) { - throw new UnsupportedOperationException("Local indexes not supported by Bulk Loader"); - /*indexTables.add( - new TargetTableRef(getQualifiedTableName(schemaName, - indexTable.getTableName().getString()), - MetaDataUtil.getLocalIndexTableName(qualifiedTableName))); */ + indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName, indexTable + .getTableName().getString()), MetaDataUtil + .getLocalIndexTableName(qualifiedTableName))); } else { indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName, indexTable.getTableName().getString()))); http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java index 1a846f9..2c3069f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java @@ -46,7 +46,7 @@ public class TargetTableRef { } @JsonCreator - private TargetTableRef(@JsonProperty("logicalName") String logicalName, + public TargetTableRef(@JsonProperty("logicalName") String logicalName, @JsonProperty("physicalName") String physicalName) { this.logicalName = logicalName; this.physicalName = physicalName; http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 25d7ff4..b5f1f85 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -69,6 +69,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public HTableDescriptor getTableDescriptor(byte[] tableName) throws SQLException; + public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException; public List<HRegionLocation> getAllTableRegions(byte[] tableName) throws SQLException; public PhoenixConnection connect(String url, Properties info) throws SQLException; http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index b29e3d9..897c207 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -3368,4 +3368,31 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public boolean isRenewingLeasesEnabled() { return supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE) && renewLeaseEnabled; } + + + @Override + public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException { + /* + * Use HConnection.getRegionLocation as it uses the cache in HConnection, to get the region + * to which specified row belongs to. + */ + int retryCount = 0, maxRetryCount = 1; + boolean reload =false; + while (true) { + try { + return connection.getRegionLocation(TableName.valueOf(tableName), row, reload); + } catch (org.apache.hadoop.hbase.TableNotFoundException e) { + String fullName = Bytes.toString(tableName); + throw new TableNotFoundException(SchemaUtil.getSchemaNameFromFullName(fullName), SchemaUtil.getTableNameFromFullName(fullName)); + } catch (IOException e) { + if (retryCount++ < maxRetryCount) { // One retry, in case split occurs while navigating + reload = true; + continue; + } + throw new SQLExceptionInfo.Builder(SQLExceptionCode.GET_TABLE_REGIONS_FAIL) + .setRootCause(e).build().buildException(); + } + } + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 6cfb382..b4bbe1f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -577,4 +577,19 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple public boolean isRenewingLeasesEnabled() { return false; } + + public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) throws SQLException { + List<HRegionLocation> regions = tableSplits.get(Bytes.toString(tableName)); + if (regions != null) { + for(HRegionLocation region: regions) { + if (Bytes.compareTo(region.getRegionInfo().getStartKey(), row) <= 0 + && Bytes.compareTo(region.getRegionInfo().getEndKey(), row) > 0) { + return region; + } + } + } + return new HRegionLocation( + new HRegionInfo(TableName.valueOf(tableName), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW), + SERVER_NAME, -1); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 9b721f8..4c7446b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -304,4 +304,10 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple public boolean isRenewingLeasesEnabled() { return getDelegate().isRenewingLeasesEnabled(); } + + @Override + public HRegionLocation getTableRegionLocation(byte[] tableName, byte[] row) + throws SQLException { + return getDelegate().getTableRegionLocation(tableName, row); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/decbfe30/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index f361fb9..98b88f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; @@ -283,7 +284,14 @@ public class IndexUtil { } }; - indexMutations.add(maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, null, null)); + byte[] regionStartKey = null; + byte[] regionEndkey = null; + if(maintainer.isLocalIndex()) { + HRegionLocation tableRegionLocation = connection.getQueryServices().getTableRegionLocation(table.getName().getBytes(), dataMutation.getRow()); + regionStartKey = tableRegionLocation.getRegionInfo().getStartKey(); + regionEndkey = tableRegionLocation.getRegionInfo().getEndKey(); + } + indexMutations.add(maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, ts, regionStartKey, regionEndkey)); } } return indexMutations;