This is an automated email from the ASF dual-hosted git repository. gjacoby pushed a commit to branch 4.x-HBase-1.2 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push: new 72d1836 PHOENIX-5018 Index mutations created by UPSERT SELECT will have wrong timestamps 72d1836 is described below commit 72d1836ca214a9141d2457597a66926bde5f6a9f Author: Kadir <kozde...@salesforce.com> AuthorDate: Tue Jan 29 17:14:02 2019 -0800 PHOENIX-5018 Index mutations created by UPSERT SELECT will have wrong timestamps Signed-off-by: Geoffrey Jacoby <gjac...@apache.org> --- .../phoenix/end2end/IndexBuildTimestampIT.java | 248 +++++++++++++++++++++ .../org/apache/phoenix/end2end/IndexToolIT.java | 37 +++ .../phoenix/end2end/TableDDLPermissionsIT.java | 8 - .../org/apache/phoenix/rpc/PhoenixServerRpcIT.java | 6 - .../phoenix/compile/ServerBuildIndexCompiler.java | 138 ++++++++++++ .../org/apache/phoenix/index/IndexMaintainer.java | 3 +- .../phoenix/mapreduce/PhoenixInputFormat.java | 3 +- .../phoenix/mapreduce/PhoenixRecordReader.java | 4 +- .../PhoenixServerBuildIndexInputFormat.java | 111 +++++++++ .../apache/phoenix/mapreduce/index/IndexTool.java | 243 ++++++++++++-------- .../index/PhoenixServerBuildIndexMapper.java | 75 +++++++ .../mapreduce/util/PhoenixConfigurationUtil.java | 25 +++ .../mapreduce/util/PhoenixMapReduceUtil.java | 27 +++ .../org/apache/phoenix/schema/MetaDataClient.java | 16 +- 14 files changed, 820 insertions(+), 124 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java new file mode 100644 index 0000000..50be0b8 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.EnvironmentEdge; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import com.google.common.collect.Lists; + +@RunWith(Parameterized.class) +public class IndexBuildTimestampIT extends BaseUniqueNamesOwnClusterIT { + private final boolean localIndex; + private final boolean async; + private final boolean view; + private final String tableDDLOptions; + + public IndexBuildTimestampIT(boolean mutable, boolean localIndex, + boolean async, boolean view) { + this.localIndex = localIndex; + this.async = async; + this.view = view; + StringBuilder optionBuilder = new StringBuilder(); + if (!mutable) { + optionBuilder.append(" IMMUTABLE_ROWS=true "); + } + optionBuilder.append(" SPLIT ON(1,2)"); + this.tableDDLOptions = optionBuilder.toString(); + } + + @BeforeClass + public static void setup() throws Exception { + IndexToolIT.setup(); + } + + @Parameters( + name = "mutable={0},localIndex={1},async={2},view={3}") + public static Collection<Object[]> data() { + List<Object[]> list = Lists.newArrayListWithExpectedSize(8); + boolean[] Booleans = new boolean[]{false, true}; + for (boolean mutable : Booleans) { + for (boolean localIndex : Booleans) { + for (boolean async : Booleans) { + for (boolean view : Booleans) { + list.add(new Object[]{mutable, localIndex, async, view}); + } + } + } + } + return list; + } + + public static void assertExplainPlan(Connection conn, boolean localIndex, String selectSql, + String dataTableFullName, String indexTableFullName) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); + String actualExplainPlan = QueryUtil.getExplainPlan(rs); + + IndexToolIT.assertExplainPlan(localIndex, actualExplainPlan, dataTableFullName, indexTableFullName); + } + + private class MyClock extends EnvironmentEdge { + long initialTime; + long delta; + + public MyClock(long delta) { + initialTime = System.currentTimeMillis() + delta; + this.delta = delta; + } + + @Override + public long currentTime() { + return System.currentTimeMillis() + delta; + } + + public long initialTime() { + return initialTime; + } + } + + private void populateTable(String tableName, MyClock clock1, MyClock clock2) throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("create table " + tableName + + " (id varchar(10) not null primary key, val varchar(10), ts timestamp)" + tableDDLOptions); + + EnvironmentEdgeManager.injectEdge(clock1); + conn.createStatement().execute("upsert into " + tableName + " values ('aaa', 'abc', current_date())"); + conn.commit(); + + EnvironmentEdgeManager.injectEdge(clock2); + conn.createStatement().execute("upsert into " + tableName + " values ('bbb', 'bcd', current_date())"); + conn.commit(); + conn.close(); + + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(clock1.initialTime())); + conn = DriverManager.getConnection(getUrl(), props); + ResultSet rs = conn.createStatement().executeQuery("select * from " + tableName); + assertFalse(rs.next()); + conn.close(); + + props.setProperty("CurrentSCN", Long.toString(clock2.initialTime())); + conn = DriverManager.getConnection(getUrl(), props); + rs = conn.createStatement().executeQuery("select * from " + tableName); + + assertTrue(rs.next()); + assertEquals("aaa", rs.getString(1)); + assertEquals("abc", rs.getString(2)); + assertNotNull(rs.getDate(3)); + + assertFalse(rs.next()); + conn.close(); + + props.setProperty("CurrentSCN", Long.toString(clock2.currentTime())); + conn = DriverManager.getConnection(getUrl(), props); + rs = conn.createStatement().executeQuery("select * from " + tableName); + + assertTrue(rs.next()); + assertEquals("aaa", rs.getString(1)); + assertEquals("abc", rs.getString(2)); + assertNotNull(rs.getDate(3)); + + assertTrue(rs.next()); + assertEquals("bbb", rs.getString(1)); + assertEquals("bcd", rs.getString(2)); + assertNotNull(rs.getDate(3)); + assertFalse(rs.next()); + conn.close(); + } + + @Test + public void testCellTimestamp() throws Exception { + EnvironmentEdgeManager.reset(); + MyClock clock1 = new MyClock(100000); + MyClock clock2 = new MyClock(200000); + String dataTableName = generateUniqueName(); + populateTable(dataTableName, clock1, clock2); + + MyClock clock3 = new MyClock(300000); + EnvironmentEdgeManager.injectEdge(clock3); + + Properties props = new Properties(); + props.setProperty(QueryServices.ENABLE_SERVER_SIDE_MUTATIONS, "true"); + Connection conn = DriverManager.getConnection(getUrl(), props); + + String viewName = null; + if (view) { + viewName = generateUniqueName(); + conn.createStatement().execute("CREATE VIEW "+ viewName + " AS SELECT * FROM " + + dataTableName); + } + String indexName = generateUniqueName(); + conn.createStatement().execute("CREATE "+ (localIndex ? "LOCAL " : "") + " INDEX " + indexName + " on " + + (view ? viewName : dataTableName) + " (val) include (ts)" + (async ? "ASYNC" : "")); + + conn.close(); + + if (async) { + // run the index MR job. + IndexToolIT.runIndexTool(true, false, null, (view ? viewName : dataTableName), indexName); + } + + // Verify the index timestamps via Phoenix + String selectSql = String.format("SELECT * FROM %s WHERE val = 'abc'", (view ? viewName : dataTableName)); + conn = DriverManager.getConnection(getUrl()); + // assert we are pulling from index table + assertExplainPlan(conn, localIndex, selectSql, dataTableName, (view ? "_IDX_" + dataTableName : indexName)); + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertTrue (rs.next()); + assertTrue(rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() < clock2.initialTime() && + rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() >= clock1.initialTime()); + + selectSql = + String.format("SELECT * FROM %s WHERE val = 'bcd'", (view ? viewName : dataTableName)); + // assert we are pulling from index table + assertExplainPlan(conn, localIndex, selectSql, dataTableName, (view ? "_IDX_" + dataTableName : indexName)); + + rs = conn.createStatement().executeQuery(selectSql); + assertTrue (rs.next()); + assertTrue(rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() < clock3.initialTime() && + rs.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp() >= clock2.initialTime() + ); + assertFalse (rs.next()); + + // Verify the index timestamps via HBase + PTable pIndexTable = PhoenixRuntime.getTable(conn, indexName); + Table table = conn.unwrap(PhoenixConnection.class).getQueryServices() + .getTable(pIndexTable.getPhysicalName().getBytes()); + + Scan scan = new Scan(); + scan.setTimeRange(clock3.initialTime(), clock3.currentTime()); + ResultScanner scanner = table.getScanner(scan); + assertTrue(scanner.next() == null); + + + scan = new Scan(); + scan.setTimeRange(clock2.initialTime(), clock3.initialTime()); + scanner = table.getScanner(scan); + assertTrue(scanner.next() != null); + + + scan = new Scan(); + scan.setTimeRange(clock1.initialTime(), clock2.initialTime()); + scanner = table.getScanner(scan); + assertTrue(scanner.next() != null); + conn.close(); + EnvironmentEdgeManager.reset(); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index c185f39..8188e53 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -47,11 +47,17 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper; +import org.apache.phoenix.mapreduce.index.PhoenixIndexImportMapper; +import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper; + import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.PhoenixRuntime; @@ -69,6 +75,7 @@ import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.Lists; import com.google.common.collect.Maps; + @RunWith(Parameterized.class) public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { @@ -498,6 +505,32 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, new String[0]); } + private static void verifyMapper(Job job, boolean directApi, boolean useSnapshot, String schemaName, + String dataTableName, String indexTableName, String tenantId) throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + if (tenantId != null) { + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } + try (Connection conn = + DriverManager.getConnection(getUrl(), props)) { + PTable dataTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, dataTableName)); + PTable indexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indexTableName)); + boolean transactional = dataTable.isTransactional(); + boolean localIndex = PTable.IndexType.LOCAL.equals(indexTable.getIndexType()); + + if (directApi) { + if ((localIndex || !transactional) && !useSnapshot) { + assertEquals(job.getMapperClass(), PhoenixServerBuildIndexMapper.class); + } else { + assertEquals(job.getMapperClass(), PhoenixIndexImportDirectMapper.class); + } + } + else { + assertEquals(job.getMapperClass(), PhoenixIndexImportMapper.class); + } + } + } + public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName, String dataTableName, String indexTableName, String... additionalArgs) throws Exception { runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, additionalArgs); @@ -515,6 +548,10 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs)); cmdArgList.addAll(Arrays.asList(additionalArgs)); int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()])); + + if (expectedStatus == 0) { + verifyMapper(indexingTool.getJob(), directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId); + } assertEquals(expectedStatus, status); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java index 86a6b60..d29056d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java @@ -204,14 +204,7 @@ public class TableDDLPermissionsIT extends BasePermissionsIT { // we should be able to read the data from another index as well to which we have not given any access to // this user - verifyAllowed(createIndex(indexName2, phoenixTableName), unprivilegedUser); verifyAllowed(readTable(phoenixTableName, indexName1), unprivilegedUser); - verifyAllowed(readTable(phoenixTableName, indexName2), unprivilegedUser); - verifyAllowed(rebuildIndex(indexName2, phoenixTableName), unprivilegedUser); - - // data table user should be able to read new index - verifyAllowed(rebuildIndex(indexName2, phoenixTableName), regularUser1); - verifyAllowed(readTable(phoenixTableName, indexName2), regularUser1); verifyAllowed(readTable(phoenixTableName), regularUser1); verifyAllowed(rebuildIndex(indexName1, phoenixTableName), regularUser1); @@ -220,7 +213,6 @@ public class TableDDLPermissionsIT extends BasePermissionsIT { verifyAllowed(dropView(viewName1), regularUser1); verifyAllowed(dropView(viewName2), regularUser1); verifyAllowed(dropColumn(phoenixTableName, "val1"), regularUser1); - verifyAllowed(dropIndex(indexName2, phoenixTableName), regularUser1); verifyAllowed(dropIndex(indexName1, phoenixTableName), regularUser1); verifyAllowed(dropTable(phoenixTableName), regularUser1); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java index 1c18667..ab05c16 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java @@ -140,12 +140,6 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT { assertEquals("k1", rs.getString(1)); assertEquals("v2", rs.getString(2)); assertFalse(rs.next()); - - TestPhoenixIndexRpcSchedulerFactory.reset(); - createIndex(conn, indexName + "_1"); - // Verify that that index queue is not used since running upsert select on server side has been disabled - // See PHOENIX-4171 - Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(), Mockito.never()).dispatch(Mockito.any(CallRunner.class)); } finally { conn.close(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java new file mode 100644 index 0000000..7d1c1b4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.compile; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.schema.*; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.StringUtil; + +import com.google.common.collect.Lists; + +import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; + + +/** + * Class that compiles plan to generate initial data values after a DDL command for + * index table. + */ +public class ServerBuildIndexCompiler { + private final PhoenixConnection connection; + private final String tableName; + private PTable dataTable; + private QueryPlan plan; + + private class RowCountMutationPlan extends BaseMutationPlan { + private RowCountMutationPlan(StatementContext context, PhoenixStatement.Operation operation) { + super(context, operation); + } + @Override + public MutationState execute() throws SQLException { + connection.getMutationState().commitDDLFence(dataTable); + Tuple tuple = plan.iterator().next(); + long rowCount = 0; + if (tuple != null) { + Cell kv = tuple.getValue(0); + ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); + // A single Cell will be returned with the count(*) - we decode that here + rowCount = PLong.INSTANCE.getCodec().decodeLong(tmpPtr, SortOrder.getDefault()); + } + // The contract is to return a MutationState that contains the number of rows modified. In this + // case, it's the number of rows in the data table which corresponds to the number of index + // rows that were added. + return new MutationState(0, 0, connection, rowCount); + } + + @Override + public QueryPlan getQueryPlan() { + return plan; + } + }; + + public ServerBuildIndexCompiler(PhoenixConnection connection, String tableName) { + this.connection = connection; + this.tableName = tableName; + } + + public MutationPlan compile(PTable index) throws SQLException { + try (final PhoenixStatement statement = new PhoenixStatement(connection)) { + String query = "SELECT count(*) FROM " + tableName; + this.plan = statement.compileQuery(query); + TableRef tableRef = plan.getTableRef(); + Scan scan = plan.getContext().getScan(); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + dataTable = tableRef.getTable(); + if (index.getIndexType() == PTable.IndexType.GLOBAL && dataTable.isTransactional()) { + throw new IllegalArgumentException( + "ServerBuildIndexCompiler does not support global indexes on transactional tables"); + } + IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(index), plan.getContext().getConnection()); + // Set the scan attributes that UngroupedAggregateRegionObserver will switch on. + // For local indexes, the BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and + // for global indexes PhoenixIndexCodec.INDEX_PROTO_MD attribute is set to the serialized form of index + // metadata to build index rows from data table rows. For global indexes, we also need to set (1) the + // BaseScannerRegionObserver.REBUILD_INDEXES attribute in order to signal UngroupedAggregateRegionObserver + // that this scan is for building global indexes and (2) the MetaDataProtocol.PHOENIX_VERSION attribute + // that will be passed as a mutation attribute for the scanned mutations that will be applied on + // the index table possibly remotely + if (index.getIndexType() == PTable.IndexType.LOCAL) { + scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, ByteUtil.copyKeyBytesIfNecessary(ptr)); + } else { + scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); + ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION); + } + // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*). + // However, in this case, we need to project all of the data columns that contribute to the index. + IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection); + for (ColumnReference columnRef : indexMaintainer.getAllColumns()) { + if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { + scan.addFamily(columnRef.getFamily()); + } else { + scan.addColumn(columnRef.getFamily(), columnRef.getQualifier()); + } + } + + if (dataTable.isTransactional()) { + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction()); + } + + // Go through MutationPlan abstraction so that we can create local indexes + // with a connectionless connection (which makes testing easier). + return new RowCountMutationPlan(plan.getContext(), PhoenixStatement.Operation.UPSERT); + } + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index d3d14d8..cb09dc4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -964,8 +964,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // add the keyvalue for the empty row put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey), this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts, - // set the value to the empty column name - dataEmptyKeyValueRef.getQualifierWritable())); + QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR)); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java index 6093edd..11c412c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java @@ -71,7 +71,6 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr @Override public RecordReader<NullWritable,T> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - final Configuration configuration = context.getConfiguration(); final QueryPlan queryPlan = getQueryPlan(context,configuration); @SuppressWarnings("unchecked") @@ -163,7 +162,7 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr * @throws IOException * @throws SQLException */ - private QueryPlan getQueryPlan(final JobContext context, final Configuration configuration) + protected QueryPlan getQueryPlan(final JobContext context, final Configuration configuration) throws IOException { Preconditions.checkNotNull(context); try { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java index 58c048b..b7e1373 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@ -60,8 +60,8 @@ import com.google.common.collect.Lists; public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<NullWritable,T> { private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class); - private final Configuration configuration; - private final QueryPlan queryPlan; + protected final Configuration configuration; + protected final QueryPlan queryPlan; private NullWritable key = NullWritable.get(); private T value = null; private Class<T> inputClass; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java new file mode 100644 index 0000000..f8ec393 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.compile.*; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.iterate.MapReduceParallelScanGrouper; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.schema.*; +import org.apache.phoenix.util.*; + +import com.google.common.base.Preconditions; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName; + +/** + * {@link InputFormat} implementation from Phoenix for building index + * + */ +public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends PhoenixInputFormat { + QueryPlan queryPlan = null; + + private static final Log LOG = LogFactory.getLog(PhoenixServerBuildIndexInputFormat.class); + + /** + * instantiated by framework + */ + public PhoenixServerBuildIndexInputFormat() { + } + + @Override + protected QueryPlan getQueryPlan(final JobContext context, final Configuration configuration) + throws IOException { + Preconditions.checkNotNull(context); + if (queryPlan != null) { + return queryPlan; + } + final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE); + final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); + final String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); + final Properties overridingProps = new Properties(); + if(txnScnValue==null && currentScnValue!=null) { + overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue); + overridingProps.put(PhoenixRuntime.BUILD_INDEX_AT_ATTRIB, currentScnValue); + } + if (tenantId != null && configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){ + overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } + String dataTableFullName = getIndexToolDataTableName(configuration); + String indexTableFullName = getIndexToolIndexTableName(configuration); + + try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) { + PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class); + Long scn = (currentScnValue != null) ? Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis(); + PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName); + ServerBuildIndexCompiler compiler = + new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName); + MutationPlan plan = compiler.compile(indexTable); + Scan scan = plan.getContext().getScan(); + + try { + scan.setTimeRange(0, scn); + } catch (IOException e) { + throw new SQLException(e); + } + queryPlan = plan.getQueryPlan(); + // since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver + if (txnScnValue != null) { + scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue))); + } + + // Initialize the query plan so it sets up the parallel scans + queryPlan.iterator(MapReduceParallelScanGrouper.getInstance()); + return queryPlan; + } catch (Exception exception) { + LOG.error(String.format("Failed to get the query plan with error [%s]", + exception.getMessage())); + throw new RuntimeException(exception); + } + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index dc361c9..d1d6ca2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -28,7 +28,10 @@ import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -74,6 +77,7 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.mapreduce.CsvBulkImportUtil; +import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat; import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames; import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder; import org.apache.phoenix.mapreduce.util.ConnectionUtil; @@ -106,6 +110,22 @@ public class IndexTool extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(IndexTool.class); + private String schemaName; + private String dataTable; + private String indexTable; + private boolean isPartialBuild; + private String qDataTable; + private String qIndexTable; + private boolean useDirectApi; + private boolean useSnapshot; + private boolean isLocalIndexBuild; + private PTable pIndexTable; + private PTable pDataTable; + private String tenantId; + private Job job; + + + private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true, "Phoenix schema name (optional)"); private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true, @@ -247,18 +267,31 @@ public class IndexTool extends Configured implements Tool { } - public Job getJob(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean isPartialBuild, - boolean useSnapshot, String tenantId) throws Exception { + public Job getJob() throws Exception { if (isPartialBuild) { - return configureJobForPartialBuild(schemaName, dataTable, tenantId); + return configureJobForPartialBuild(); } else { - return configureJobForAsyncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot, tenantId); + long maxTimeRange = pIndexTable.getTimeStamp() + 1; + // this is set to ensure index tables remains consistent post population. + + if (pDataTable.isTransactional()) { + configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE, + Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange))); + configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pDataTable.getTransactionProvider().name()); + } + configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, + Long.toString(maxTimeRange)); + if (useSnapshot || !useDirectApi || (!isLocalIndexBuild && pDataTable.isTransactional())) { + return configureJobForAysncIndex(); + } + else { + //Local and non-transactional global indexes to be built on the server side + return configureJobForServerBuildIndex(); + } } } - - private Job configureJobForPartialBuild(String schemaName, String dataTable, String tenantId) throws Exception { - final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); - final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable); + + private Job configureJobForPartialBuild() throws Exception { connection = ConnectionUtil.getInputConnection(configuration); long minDisableTimestamp = HConstants.LATEST_TIMESTAMP; PTable indexWithMinDisableTimestamp = null; @@ -266,7 +299,7 @@ public class IndexTool extends Configured implements Tool { //Get Indexes in building state, minDisabledTimestamp List<String> disableIndexes = new ArrayList<String>(); List<PTable> disabledPIndexes = new ArrayList<PTable>(); - for (PTable index : pdataTable.getIndexes()) { + for (PTable index : pDataTable.getIndexes()) { if (index.getIndexState().equals(PIndexState.BUILDING)) { disableIndexes.add(index.getTableName().getString()); disabledPIndexes.add(index); @@ -299,10 +332,10 @@ public class IndexTool extends Configured implements Tool { //serialize index maintaienr in job conf with Base64 TODO: Need to find better way to serialize them in conf. List<IndexMaintainer> maintainers = Lists.newArrayListWithExpectedSize(disabledPIndexes.size()); for (PTable index : disabledPIndexes) { - maintainers.add(index.getIndexMaintainer(pdataTable, connection.unwrap(PhoenixConnection.class))); + maintainers.add(index.getIndexMaintainer(pDataTable, connection.unwrap(PhoenixConnection.class))); } ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY); - IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class)); + IndexMaintainer.serializeAdditional(pDataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class)); PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr); if (tenantId != null) { PhoenixConfigurationUtil.setTenantId(configuration, tenantId); @@ -313,15 +346,15 @@ public class IndexTool extends Configured implements Tool { scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp); scan.setRaw(true); scan.setCacheBlocks(false); - if (pdataTable.isTransactional()) { - long maxTimeRange = pdataTable.getTimeStamp() + 1; + if (pDataTable.isTransactional()) { + long maxTimeRange = pDataTable.getTimeStamp() + 1; scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange))))); } - String physicalTableName=pdataTable.getPhysicalName().getString(); - final String jobName = String.format("Phoenix Indexes build for " + pdataTable.getName().toString()); + String physicalTableName=pDataTable.getPhysicalName().getString(); + final String jobName = String.format("Phoenix Indexes build for " + pDataTable.getName().toString()); PhoenixConfigurationUtil.setInputTableName(configuration, qDataTable); PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalTableName); @@ -338,7 +371,7 @@ public class IndexTool extends Configured implements Tool { null, job); TableMapReduceUtil.initCredentials(job); TableInputFormat.configureSplitTable(job, TableName.valueOf(physicalTableName)); - return configureSubmittableJobUsingDirectApi(job, true); + return configureSubmittableJobUsingDirectApi(job); } private long getMaxRebuildAsyncDate(String schemaName, List<String> disableIndexes) throws SQLException { @@ -368,40 +401,15 @@ public class IndexTool extends Configured implements Tool { } - private Job configureJobForAsyncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot, String tenantId) - throws Exception { - final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); - final String qIndexTable; - if (schemaName != null && !schemaName.isEmpty()) { - qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable); - } else { - qIndexTable = indexTable; - } - final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable); - - final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable); - - long maxTimeRange = pindexTable.getTimeStamp() + 1; - // this is set to ensure index tables remains consistent post population. + private Job configureJobForAysncIndex() - if (pdataTable.isTransactional()) { - configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE, - Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange))); - configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pdataTable.getTransactionProvider().name()); - } - configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, - Long.toString(maxTimeRange)); - - // check if the index type is LOCAL, if so, derive and set the physicalIndexName that is - // computed from the qDataTable name. - String physicalIndexTable = pindexTable.getPhysicalName().getString(); - + throws Exception { + String physicalIndexTable = pIndexTable.getPhysicalName().getString(); final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class); final PostIndexDDLCompiler ddlCompiler = - new PostIndexDDLCompiler(pConnection, new TableRef(pdataTable)); - ddlCompiler.compile(pindexTable); - + new PostIndexDDLCompiler(pConnection, new TableRef(pDataTable)); + ddlCompiler.compile(pIndexTable); final List<String> indexColumns = ddlCompiler.getIndexColumnNames(); final String selectQuery = ddlCompiler.getSelectQuery(); final String upsertQuery = @@ -410,6 +418,7 @@ public class IndexTool extends Configured implements Tool { configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery); PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable); PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable); + PhoenixConfigurationUtil.setUpsertColumnNames(configuration, indexColumns.toArray(new String[indexColumns.size()])); if (tenantId != null) { @@ -419,25 +428,22 @@ public class IndexTool extends Configured implements Tool { PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns); ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); + fs = outputPath.getFileSystem(configuration); + fs.delete(outputPath, true); final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable); final Job job = Job.getInstance(configuration, jobName); job.setJarByClass(IndexTool.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); - if (outputPath != null) { - fs = outputPath.getFileSystem(configuration); - fs.delete(outputPath, true); - FileOutputFormat.setOutputPath(job, outputPath); - } + FileOutputFormat.setOutputPath(job, outputPath); if (!useSnapshot) { - PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, - selectQuery); + PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, selectQuery); } else { HBaseAdmin admin = null; String snapshotName; try { admin = pConnection.getQueryServices().getAdmin(); - String pdataTableName = pdataTable.getName().getString(); + String pdataTableName = pDataTable.getName().getString(); snapshotName = new StringBuilder(pdataTableName).append("-Snapshot").toString(); admin.snapshot(snapshotName, TableName.valueOf(pdataTableName)); } finally { @@ -452,17 +458,47 @@ public class IndexTool extends Configured implements Tool { // set input for map reduce job using hbase snapshots PhoenixMapReduceUtil - .setInput(job, PhoenixIndexDBWritable.class, snapshotName, qDataTable, restoreDir, selectQuery); + .setInput(job, PhoenixIndexDBWritable.class, snapshotName, qDataTable, restoreDir, selectQuery); } TableMapReduceUtil.initCredentials(job); if (useDirectApi) { - return configureSubmittableJobUsingDirectApi(job, false); + job.setMapperClass(PhoenixIndexImportDirectMapper.class); + return configureSubmittableJobUsingDirectApi(job); } else { return configureRunnableJobUsingBulkLoad(job, outputPath); - } - + } + + private Job configureJobForServerBuildIndex() + throws Exception { + + PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable); + PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable); + + String physicalIndexTable = pIndexTable.getPhysicalName().getString(); + + PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable); + PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable); + if (tenantId != null) { + PhoenixConfigurationUtil.setTenantId(configuration, tenantId); + } + + fs = outputPath.getFileSystem(configuration); + fs.delete(outputPath, true); + + final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable); + final Job job = Job.getInstance(configuration, jobName); + job.setJarByClass(IndexTool.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + FileOutputFormat.setOutputPath(job, outputPath); + + PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class, + qDataTable, ""); + + TableMapReduceUtil.initCredentials(job); + job.setMapperClass(PhoenixServerBuildIndexMapper.class); + return configureSubmittableJobUsingDirectApi(job); } /** @@ -496,12 +532,9 @@ public class IndexTool extends Configured implements Tool { * @return * @throws Exception */ - private Job configureSubmittableJobUsingDirectApi(Job job, boolean isPartialRebuild) + private Job configureSubmittableJobUsingDirectApi(Job job) throws Exception { - if (!isPartialRebuild) { - //Don't configure mapper for partial build as it is configured already - job.setMapperClass(PhoenixIndexImportDirectMapper.class); - } + job.setReducerClass(PhoenixIndexImportDirectReducer.class); Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); @@ -520,6 +553,10 @@ public class IndexTool extends Configured implements Tool { } + public Job getJob() { + return job; + } + @Override public int run(String[] args) throws Exception { Connection connection = null; @@ -532,64 +569,75 @@ public class IndexTool extends Configured implements Tool { printHelpAndExit(e.getMessage(), getOptions()); } final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf()); - final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); - final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); - final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); - final boolean isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()); - final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); - boolean useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt()); - String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()); - boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); - boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt()); boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt()); - byte[][] splitKeysBeforeJob = null; - boolean isLocalIndexBuild = false; - PTable pindexTable = null; - String tenantId = null; + tenantId = null; if (useTenantId) { tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt()); configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); } connection = ConnectionUtil.getInputConnection(configuration); + schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); + dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); + indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); + isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()); + qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); + pDataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable); + useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt()); + String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()); + boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); + useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt()); + + byte[][] splitKeysBeforeJob = null; + isLocalIndexBuild = false; + pIndexTable = null; + + connection = ConnectionUtil.getInputConnection(configuration); + if (indexTable != null) { if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) { throw new IllegalArgumentException(String.format( " %s is not an index table for %s for this connection", indexTable, qDataTable)); } - pindexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty() + pIndexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty() ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable); + + if (schemaName != null && !schemaName.isEmpty()) { + qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable); + } else { + qIndexTable = indexTable; + } htable = (HTable)connection.unwrap(PhoenixConnection.class).getQueryServices() - .getTable(pindexTable.getPhysicalName().getBytes()); - if (IndexType.LOCAL.equals(pindexTable.getIndexType())) { + .getTable(pIndexTable.getPhysicalName().getBytes()); + + if (IndexType.LOCAL.equals(pIndexTable.getIndexType())) { isLocalIndexBuild = true; splitKeysBeforeJob = htable.getRegionLocator().getStartKeys(); } // presplit the index table boolean autosplit = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt()); - boolean isSalted = pindexTable.getBucketNum() != null; // no need to split salted tables - if (!isSalted && IndexType.GLOBAL.equals(pindexTable.getIndexType()) + boolean isSalted = pIndexTable.getBucketNum() != null; // no need to split salted tables + if (!isSalted && IndexType.GLOBAL.equals(pIndexTable.getIndexType()) && (autosplit || cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt()))) { String nOpt = cmdLine.getOptionValue(AUTO_SPLIT_INDEX_OPTION.getOpt()); int autosplitNumRegions = nOpt == null ? DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt); String rateOpt = cmdLine.getOptionValue(SPLIT_INDEX_OPTION.getOpt()); double samplingRate = rateOpt == null ? DEFAULT_SPLIT_SAMPLING_RATE : Double.parseDouble(rateOpt); LOG.info(String.format("Will split index %s , autosplit=%s , autoSplitNumRegions=%s , samplingRate=%s", indexTable, autosplit, autosplitNumRegions, samplingRate)); - splitIndexTable(connection.unwrap(PhoenixConnection.class), qDataTable, pindexTable, autosplit, autosplitNumRegions, samplingRate); + + splitIndexTable(connection.unwrap(PhoenixConnection.class), autosplit, autosplitNumRegions, samplingRate, configuration); } } - - PTable pdataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable); Path outputPath = null; FileSystem fs = null; if (basePath != null) { - outputPath = CsvBulkImportUtil.getOutputPath(new Path(basePath), pindexTable == null - ? pdataTable.getPhysicalName().getString() : pindexTable.getPhysicalName().getString()); + outputPath = CsvBulkImportUtil.getOutputPath(new Path(basePath), pIndexTable == null + ? pDataTable.getPhysicalName().getString() : pIndexTable.getPhysicalName().getString()); fs = outputPath.getFileSystem(configuration); fs.delete(outputPath, true); } - - Job job = new JobFactory(connection, configuration, outputPath).getJob(schemaName, indexTable, dataTable, - useDirectApi, isPartialBuild, useSnapshot, tenantId); + + job = new JobFactory(connection, configuration, outputPath).getJob(); + if (!isForeground && useDirectApi) { LOG.info("Running Index Build in Background - Submit async and exit"); job.submit(); @@ -635,32 +683,29 @@ public class IndexTool extends Configured implements Tool { } } - - private void splitIndexTable(PhoenixConnection pConnection, String qDataTable, - PTable pindexTable, boolean autosplit, int autosplitNumRegions, double samplingRate) + private void splitIndexTable(PhoenixConnection pConnection, boolean autosplit, int autosplitNumRegions, double samplingRate, Configuration configuration) throws SQLException, IOException, IllegalArgumentException, InterruptedException { - final PTable pdataTable = PhoenixRuntime.getTable(pConnection, qDataTable); int numRegions; try (HTable hDataTable = (HTable) pConnection.getQueryServices() - .getTable(pdataTable.getPhysicalName().getBytes())) { + .getTable(pDataTable.getPhysicalName().getBytes())) { numRegions = hDataTable.getRegionLocator().getStartKeys().length; if (autosplit && !(numRegions > autosplitNumRegions)) { LOG.info(String.format( "Will not split index %s because the data table only has %s regions, autoSplitNumRegions=%s", - pindexTable.getPhysicalName(), numRegions, autosplitNumRegions)); + pIndexTable.getPhysicalName(), numRegions, autosplitNumRegions)); return; // do nothing if # of regions is too low } } // build a tablesample query to fetch index column values from the data table - DataSourceColNames colNames = new DataSourceColNames(pdataTable, pindexTable); + DataSourceColNames colNames = new DataSourceColNames(pDataTable, pIndexTable); String qTableSample = String.format(qDataTable + " TABLESAMPLE(%.2f)", samplingRate); List<String> dataColNames = colNames.getDataColNames(); final String dataSampleQuery = QueryUtil.constructSelectStatement(qTableSample, dataColNames, null, Hint.NO_INDEX, true); - IndexMaintainer maintainer = IndexMaintainer.create(pdataTable, pindexTable, pConnection); + IndexMaintainer maintainer = IndexMaintainer.create(pDataTable, pIndexTable, pConnection); ImmutableBytesWritable dataRowKeyPtr = new ImmutableBytesWritable(); try (final PhoenixResultSet rs = pConnection.createStatement().executeQuery(dataSampleQuery) @@ -684,7 +729,7 @@ public class IndexTool extends Configured implements Tool { splitPoints[splitIdx++] = b.getRightBoundExclusive(); } // drop table and recreate with appropriate splits - TableName indexTN = TableName.valueOf(pindexTable.getPhysicalName().getBytes()); + TableName indexTN = TableName.valueOf(pIndexTable.getPhysicalName().getBytes()); HTableDescriptor descriptor = admin.getTableDescriptor(indexTN); admin.disableTable(indexTN); admin.deleteTable(indexTN); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java new file mode 100644 index 0000000..34bcc9b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixServerBuildIndexMapper.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.PhoenixJobCounters; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mapper that does not do much as regions servers actually build the index from the data table regions directly + */ +public class PhoenixServerBuildIndexMapper extends + Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, IntWritable> { + + private static final Logger LOG = LoggerFactory.getLogger(PhoenixServerBuildIndexMapper.class); + + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + } + + @Override + protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context) + throws IOException, InterruptedException { + context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); + // Make sure progress is reported to Application Master. + context.progress(); + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), new IntWritable(0)); + super.cleanup(context); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index b81394b..1cac3db 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -122,6 +122,10 @@ public final class PhoenixConfigurationUtil { public static final String SCRUTINY_INDEX_TABLE_NAME = "phoenix.mr.scrutiny.index.table.name"; + public static final String INDEX_TOOL_DATA_TABLE_NAME = "phoenix.mr.index_tool.data.table.name"; + + public static final String INDEX_TOOL_INDEX_TABLE_NAME = "phoenix.mr.index_tool.index.table.name"; + public static final String SCRUTINY_SOURCE_TABLE = "phoenix.mr.scrutiny.source.table"; public static final String SCRUTINY_BATCH_SIZE = "phoenix.mr.scrutiny.batch.size"; @@ -545,6 +549,16 @@ public final class PhoenixConfigurationUtil { Preconditions.checkNotNull(configuration); return configuration.get(SCRUTINY_INDEX_TABLE_NAME); } + public static void setIndexToolDataTableName(Configuration configuration, String qDataTableName) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(qDataTableName); + configuration.set(INDEX_TOOL_DATA_TABLE_NAME, qDataTableName); + } + + public static String getIndexToolDataTableName(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return configuration.get(INDEX_TOOL_DATA_TABLE_NAME); + } public static void setScrutinyIndexTable(Configuration configuration, String qIndexTableName) { Preconditions.checkNotNull(configuration); @@ -557,6 +571,17 @@ public final class PhoenixConfigurationUtil { return SourceTable.valueOf(configuration.get(SCRUTINY_SOURCE_TABLE)); } + public static void setIndexToolIndexTableName(Configuration configuration, String qIndexTableName) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(qIndexTableName); + configuration.set(INDEX_TOOL_INDEX_TABLE_NAME, qIndexTableName); + } + + public static String getIndexToolIndexTableName(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return configuration.get(INDEX_TOOL_INDEX_TABLE_NAME); + } + public static void setScrutinySourceTable(Configuration configuration, SourceTable sourceTable) { Preconditions.checkNotNull(configuration); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java index 3462177..bab6cee 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java @@ -19,6 +19,7 @@ package org.apache.phoenix.mapreduce.util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.phoenix.mapreduce.PhoenixInputFormat; @@ -70,6 +71,23 @@ public final class PhoenixMapReduceUtil { /** * * @param job + * @param inputClass DBWritable class + * @param inputFormatClass InputFormat class + * @param tableName Input table name + * @param inputQuery Select query + */ + + public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, + final Class<? extends InputFormat> inputFormatClass, + final String tableName, final String inputQuery) { + final Configuration configuration = setInput(job, inputClass, inputFormatClass, tableName); + PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery); + PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY); + } + + /** + * + * @param job * @param inputClass DBWritable class * @param snapshotName The name of a snapshot (of a table) to read from * @param tableName Input table name @@ -135,6 +153,15 @@ public final class PhoenixMapReduceUtil { return configuration; } + private static Configuration setInput(final Job job, final Class<? extends DBWritable> inputClass, + final Class<? extends InputFormat> inputFormatClass, final String tableName){ + job.setInputFormatClass(inputFormatClass); + final Configuration configuration = job.getConfiguration(); + PhoenixConfigurationUtil.setInputTableName(configuration, tableName); + PhoenixConfigurationUtil.setInputClass(configuration,inputClass); + return configuration; + } + /** * A method to override which HBase cluster for {@link PhoenixInputFormat} to read from * @param job MapReduce Job diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 67a4928..269d121 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -156,6 +156,7 @@ import org.apache.phoenix.compile.PostDDLCompiler; import org.apache.phoenix.compile.PostIndexDDLCompiler; import org.apache.phoenix.compile.PostLocalIndexDDLCompiler; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.ServerBuildIndexCompiler; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.StatementNormalizer; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; @@ -1401,16 +1402,17 @@ public class MetaDataClient { } private MutationPlan getMutationPlanForBuildingIndex(PTable index, TableRef dataTableRef) throws SQLException { - MutationPlan mutationPlan; if (index.getIndexType() == IndexType.LOCAL) { PostLocalIndexDDLCompiler compiler = new PostLocalIndexDDLCompiler(connection, getFullTableName(dataTableRef)); - mutationPlan = compiler.compile(index); - } else { + return compiler.compile(index); + } else if (dataTableRef.getTable().isTransactional()){ PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef); - mutationPlan = compiler.compile(index); + return compiler.compile(index); + } else { + ServerBuildIndexCompiler compiler = new ServerBuildIndexCompiler(connection, getFullTableName(dataTableRef)); + return compiler.compile(index); } - return mutationPlan; } private MutationState buildIndex(PTable index, TableRef dataTableRef) throws SQLException { @@ -1751,6 +1753,10 @@ public class MetaDataClient { if (connection.getSCN() != null) { return buildIndexAtTimeStamp(table, statement.getTable()); } + + String dataTableFullName = SchemaUtil.getTableName( + tableRef.getTable().getSchemaName().getString(), + tableRef.getTable().getTableName().getString()); return buildIndex(table, tableRef); }