http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java index e2af717,364b358..aee729f --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java @@@ -73,11 -71,9 +73,12 @@@ import org.apache.phoenix.util.SchemaUt import org.apache.phoenix.util.StringUtil; import org.junit.After; import org.junit.Before; + import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** * * Test for failure of region server to write to index table. @@@ -153,277 -132,233 +154,281 @@@ public class MutableIndexFailureIT exte } } + @Ignore("See PHOENIX-2331") @Test(timeout=300000) public void testWriteFailureDisablesLocalIndex() throws Exception { - testWriteFailureDisablesIndex(true); + helpTestWriteFailureDisablesIndex(true); } - + + @Ignore("See PHOENIX-2332") @Test(timeout=300000) - public void testWriteFailureDisablesIndex() throws Exception { - testWriteFailureDisablesIndex(false); + public void testWriteFailureDisablesGlobalIndex() throws Exception { + helpTestWriteFailureDisablesIndex(false); } - - public void testWriteFailureDisablesIndex(boolean localIndex) throws Exception { - String query; - ResultSet rs; + private void helpTestWriteFailureDisablesIndex(boolean localIndex) throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = driver.connect(url, props); - conn.setAutoCommit(false); - conn.createStatement().execute( - "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - query = "SELECT * FROM " + DATA_TABLE_FULL_NAME; - rs = conn.createStatement().executeQuery(query); - assertFalse(rs.next()); - - if(localIndex) { - conn.createStatement().execute( - "CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)"); - conn.createStatement().execute( - "CREATE LOCAL INDEX " + INDEX_TABLE_NAME+ "_2" + " ON " + DATA_TABLE_FULL_NAME + " (v2) INCLUDE (v1)"); - } else { - conn.createStatement().execute( - "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)"); + try (Connection conn = driver.connect(url, props);) { + String query; + ResultSet rs; + conn.setAutoCommit(false); + conn.createStatement().execute( + "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions); + query = "SELECT * FROM " + DATA_TABLE_FULL_NAME; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + if(localIndex) { + conn.createStatement().execute( + "CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)"); + conn.createStatement().execute( + "CREATE LOCAL INDEX " + INDEX_TABLE_NAME+ "_2" + " ON " + DATA_TABLE_FULL_NAME + " (v2) INCLUDE (v1)"); + } else { + conn.createStatement().execute( + "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)"); + } + + query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + // Verify the metadata for index is correct. + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, + new String[] { PTableType.INDEX.toString() }); + assertTrue(rs.next()); + assertEquals(INDEX_TABLE_NAME, rs.getString(3)); + assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); + assertFalse(rs.next()); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + stmt.setString(1, "a"); + stmt.setString(2, "x"); + stmt.setString(3, "1"); + stmt.execute(); + conn.commit(); + + TableName indexTable = + TableName.valueOf(localIndex ? MetaDataUtil + .getLocalIndexTableName(DATA_TABLE_FULL_NAME) : INDEX_TABLE_FULL_NAME); + HBaseAdmin admin = this.util.getHBaseAdmin(); + HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable); + try{ + admin.disableTable(indexTable); + admin.deleteTable(indexTable); + } catch (TableNotFoundException ignore) {} + + stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + stmt.setString(1, "a2"); + stmt.setString(2, "x2"); + stmt.setString(3, "2"); + stmt.execute(); + + if (transactional) { + try { + conn.commit(); + fail(); + } catch (SQLException e1) { + try { + conn.rollback(); + fail(); + } catch (SQLException e2) { + // rollback fails as well because index is disabled + } + } + } + else { + conn.commit(); + } + + // Verify the metadata for index is correct. + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, + new String[] { PTableType.INDEX.toString() }); + assertTrue(rs.next()); + assertEquals(INDEX_TABLE_NAME, rs.getString(3)); + // if the table is transactional, the index will not be disabled if there is a failure + PIndexState indexState = transactional ? PIndexState.ACTIVE : PIndexState.DISABLE; + assertEquals(indexState.toString(), rs.getString("INDEX_STATE")); + assertFalse(rs.next()); + if(localIndex) { + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2", + new String[] { PTableType.INDEX.toString() }); + assertTrue(rs.next()); + assertEquals(INDEX_TABLE_NAME+"_2", rs.getString(3)); + assertEquals(indexState.toString(), rs.getString("INDEX_STATE")); + assertFalse(rs.next()); + } + + // if the table is transactional the write to the index table will fail because the index has not been disabled + if (!transactional) { + // Verify UPSERT on data table still work after index is disabled + stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + stmt.setString(1, "a3"); + stmt.setString(2, "x3"); + stmt.setString(3, "3"); + stmt.execute(); + conn.commit(); + } + + if (transactional) { + // if the table was transactional there should be 1 row (written before the index was disabled) + query = "SELECT /*+ NO_INDEX */ v2 FROM " + DATA_TABLE_FULL_NAME; + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME; + assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs)); + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("1", rs.getString(1)); + assertFalse(rs.next()); + } else { + // if the table was not transactional there should be three rows (all writes to data table should succeed) + query = "SELECT v2 FROM " + DATA_TABLE_FULL_NAME; + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME; + assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs)); + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("1", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("2", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("3", rs.getString(1)); + assertFalse(rs.next()); + } + + // recreate index table + admin.createTable(indexTableDesc); + do { + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, + new String[] { PTableType.INDEX.toString() }); + assertTrue(rs.next()); + if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){ + break; + } + if(localIndex) { + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2", + new String[] { PTableType.INDEX.toString() }); + assertTrue(rs.next()); + if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){ + break; + } + } + Thread.sleep(15 * 1000); // sleep 15 secs + } while(true); + + // Verify UPSERT on data table still work after index table is recreated + stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + stmt.setString(1, "a4"); + stmt.setString(2, "x4"); + stmt.setString(3, "4"); + stmt.execute(); + conn.commit(); + + // verify index table has data + query = "SELECT count(1) FROM " + INDEX_TABLE_FULL_NAME; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + + // for txn tables there will be only one row in the index (a4) + // for non txn tables there will be three rows because we only partially build index from where we failed and the oldest + // index row has been deleted when we dropped the index table during test + assertEquals( transactional ? 1: 3, rs.getInt(1)); } - - query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME; - rs = conn.createStatement().executeQuery(query); - assertFalse(rs.next()); - - // Verify the metadata for index is correct. - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - assertEquals(INDEX_TABLE_NAME, rs.getString(3)); - assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); - assertFalse(rs.next()); - - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); - stmt.setString(1, "a"); - stmt.setString(2, "x"); - stmt.setString(3, "1"); - stmt.execute(); - conn.commit(); - - TableName indexTable = - TableName.valueOf(localIndex ? MetaDataUtil - .getLocalIndexTableName(DATA_TABLE_FULL_NAME) : INDEX_TABLE_FULL_NAME); - HBaseAdmin admin = this.util.getHBaseAdmin(); - HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable); - try{ - admin.disableTable(indexTable); - admin.deleteTable(indexTable); - } catch (TableNotFoundException ignore) {} - - stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); - stmt.setString(1, "a2"); - stmt.setString(2, "x2"); - stmt.setString(3, "2"); - stmt.execute(); - try { - conn.commit(); - } catch (SQLException e) {} - - // Verify the metadata for index is correct. - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - assertEquals(INDEX_TABLE_NAME, rs.getString(3)); - assertEquals(PIndexState.DISABLE.toString(), rs.getString("INDEX_STATE")); - assertFalse(rs.next()); - if(localIndex) { - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2", - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - assertEquals(INDEX_TABLE_NAME+"_2", rs.getString(3)); - assertEquals(PIndexState.DISABLE.toString(), rs.getString("INDEX_STATE")); - assertFalse(rs.next()); - } - - // Verify UPSERT on data table still work after index is disabled - stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); - stmt.setString(1, "a3"); - stmt.setString(2, "x3"); - stmt.setString(3, "3"); - stmt.execute(); - conn.commit(); - - query = "SELECT v2 FROM " + DATA_TABLE_FULL_NAME + " where v1='x3'"; - rs = conn.createStatement().executeQuery("EXPLAIN " + query); - assertTrue(QueryUtil.getExplainPlan(rs).contains("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME)); - rs = conn.createStatement().executeQuery(query); - assertTrue(rs.next()); - - // recreate index table - admin.createTable(indexTableDesc); - do { - Thread.sleep(15 * 1000); // sleep 15 secs - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){ - break; - } - if(localIndex) { - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2", - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){ - break; - } - } - } while(true); - - // verify index table has data - query = "SELECT count(1) FROM " + INDEX_TABLE_FULL_NAME; - rs = conn.createStatement().executeQuery(query); - assertTrue(rs.next()); - - // using 2 here because we only partially build index from where we failed and the oldest - // index row has been deleted when we dropped the index table during test. - assertEquals(2, rs.getInt(1)); } @Test(timeout=300000) public void testWriteFailureWithRegionServerDown() throws Exception { - String query; - ResultSet rs; - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = driver.connect(url, props); - conn.setAutoCommit(false); - conn.createStatement().execute( - "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); - query = "SELECT * FROM " + DATA_TABLE_FULL_NAME; - rs = conn.createStatement().executeQuery(query); - assertFalse(rs.next()); - - conn.createStatement().execute( - "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)"); - query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME; - rs = conn.createStatement().executeQuery(query); - assertFalse(rs.next()); - - // Verify the metadata for index is correct. - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - assertEquals(INDEX_TABLE_NAME, rs.getString(3)); - assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); - assertFalse(rs.next()); - - PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); - stmt.setString(1, "a"); - stmt.setString(2, "x"); - stmt.setString(3, "1"); - stmt.execute(); - conn.commit(); - - // find a RS which doesn't has CATALOG table - TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG"); - TableName indexTable = TableName.valueOf(INDEX_TABLE_FULL_NAME); - final HBaseCluster cluster = this.util.getHBaseCluster(); - Collection<ServerName> rss = cluster.getClusterStatus().getServers(); - HBaseAdmin admin = this.util.getHBaseAdmin(); - List<HRegionInfo> regions = admin.getTableRegions(catalogTable); - ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getTable(), - regions.get(0).getRegionName()); - ServerName metaRS = cluster.getServerHoldingMeta(); - ServerName rsToBeKilled = null; - - // find first RS isn't holding META or CATALOG table - for(ServerName curRS : rss) { - if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) { - rsToBeKilled = curRS; - break; - } + try (Connection conn = driver.connect(url, props);) { + String query; + ResultSet rs; + conn.setAutoCommit(false); + conn.createStatement().execute( + "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"); + query = "SELECT * FROM " + DATA_TABLE_FULL_NAME; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + conn.createStatement().execute( + "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)"); + query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME; + rs = conn.createStatement().executeQuery(query); + assertFalse(rs.next()); + + // Verify the metadata for index is correct. + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, + new String[] { PTableType.INDEX.toString() }); + assertTrue(rs.next()); + assertEquals(INDEX_TABLE_NAME, rs.getString(3)); + assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE")); + assertFalse(rs.next()); + + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)"); + stmt.setString(1, "a"); + stmt.setString(2, "x"); + stmt.setString(3, "1"); + stmt.execute(); + conn.commit(); + + // find a RS which doesn't has CATALOG table + TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG"); + TableName indexTable = TableName.valueOf(INDEX_TABLE_FULL_NAME); + final HBaseCluster cluster = this.util.getHBaseCluster(); + Collection<ServerName> rss = cluster.getClusterStatus().getServers(); + HBaseAdmin admin = this.util.getHBaseAdmin(); + List<HRegionInfo> regions = admin.getTableRegions(catalogTable); - ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getRegionName()); ++ ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getTable(), ++ regions.get(0).getRegionName()); + ServerName metaRS = cluster.getServerHoldingMeta(); + ServerName rsToBeKilled = null; + + // find first RS isn't holding META or CATALOG table + for(ServerName curRS : rss) { + if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) { + rsToBeKilled = curRS; + break; + } + } + assertTrue(rsToBeKilled != null); + + regions = admin.getTableRegions(indexTable); + final HRegionInfo indexRegion = regions.get(0); + final ServerName dstRS = rsToBeKilled; + admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName())); + this.util.waitFor(30000, 200, new Waiter.Predicate<Exception>() { + @Override + public boolean evaluate() throws Exception { - ServerName sn = cluster.getServerHoldingRegion(indexRegion.getRegionName()); ++ ServerName sn = cluster.getServerHoldingRegion(indexRegion.getTable(), ++ indexRegion.getRegionName()); + return (sn != null && sn.equals(dstRS)); + } + }); + + // use timer sending updates in every 10ms + this.scheduleTimer = new Timer(true); + this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn), 0, 10); + // let timer sending some updates + Thread.sleep(100); + + // kill RS hosting index table + this.util.getHBaseCluster().killRegionServer(rsToBeKilled); + + // wait for index table completes recovery + this.util.waitUntilAllRegionsAssigned(indexTable); + + // Verify the metadata for index is correct. + do { + Thread.sleep(15 * 1000); // sleep 15 secs + rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, + new String[] { PTableType.INDEX.toString() }); + assertTrue(rs.next()); + if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){ + break; + } + } while(true); + this.scheduleTimer.cancel(); + + assertEquals(cluster.getClusterStatus().getDeadServers(), 1); } - assertTrue(rsToBeKilled != null); - - regions = admin.getTableRegions(indexTable); - final HRegionInfo indexRegion = regions.get(0); - final ServerName dstRS = rsToBeKilled; - admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName())); - this.util.waitFor(30000, 200, new Waiter.Predicate<Exception>() { - @Override - public boolean evaluate() throws Exception { - ServerName sn = cluster.getServerHoldingRegion(indexRegion.getTable(), - indexRegion.getRegionName()); - return (sn != null && sn.equals(dstRS)); - } - }); - - // use timer sending updates in every 10ms - this.scheduleTimer = new Timer(true); - this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn), 0, 10); - // let timer sending some updates - Thread.sleep(100); - - // kill RS hosting index table - this.util.getHBaseCluster().killRegionServer(rsToBeKilled); - - // wait for index table completes recovery - this.util.waitUntilAllRegionsAssigned(indexTable); - - // Verify the metadata for index is correct. - do { - Thread.sleep(15 * 1000); // sleep 15 secs - rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME, - new String[] { PTableType.INDEX.toString() }); - assertTrue(rs.next()); - if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){ - break; - } - } while(true); - this.scheduleTimer.cancel(); - - assertEquals(cluster.getClusterStatus().getDeadServers(), 1); } static class SendingUpdatesScheduleTask extends TimerTask {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java index 0000000,e0f0a3c..b3b3316 mode 000000,100644..100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@@ -1,0 -1,318 +1,318 @@@ + /* + * Copyright 2014 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you maynot use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicablelaw or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.phoenix.execute; + + import static com.google.common.collect.Lists.newArrayList; + import static com.google.common.collect.Sets.newHashSet; + import static java.util.Collections.singletonList; + import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver; + import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; + import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; + import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; + import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; + import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; + import static org.apache.phoenix.util.TestUtil.LOCALHOST; + import static org.junit.Assert.assertArrayEquals; + import static org.junit.Assert.assertEquals; + import static org.junit.Assert.fail; + + import java.io.IOException; + import java.sql.Connection; + import java.sql.Driver; + import java.sql.ResultSet; + import java.sql.SQLException; + import java.sql.Statement; + import java.util.Comparator; + import java.util.List; + import java.util.Map; + import java.util.Properties; + + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.hbase.DoNotRetryIOException; + import org.apache.hadoop.hbase.HBaseIOException; + import org.apache.hadoop.hbase.HBaseTestingUtility; + import org.apache.hadoop.hbase.client.Delete; + import org.apache.hadoop.hbase.client.Durability; + import org.apache.hadoop.hbase.client.Put; + import org.apache.hadoop.hbase.coprocessor.ObserverContext; + import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; + import org.apache.hadoop.hbase.coprocessor.RegionObserver; + import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; + import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + import org.apache.hadoop.hbase.util.Bytes; + import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; + import org.apache.phoenix.hbase.index.Indexer; + import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; + import org.apache.phoenix.jdbc.PhoenixConnection; + import org.apache.phoenix.query.QueryServices; + import org.apache.phoenix.schema.TableRef; + import org.apache.phoenix.util.PhoenixRuntime; + import org.apache.phoenix.util.ReadOnlyProps; + import org.junit.AfterClass; + import org.junit.BeforeClass; + import org.junit.Test; + import org.junit.experimental.categories.Category; + + import com.google.common.base.Preconditions; + import com.google.common.collect.Maps; + + @Category(NeedsOwnMiniClusterTest.class) + public class PartialCommitIT { + + private static final String TABLE_NAME_TO_FAIL = "b_failure_table".toUpperCase(); + private static final byte[] ROW_TO_FAIL = Bytes.toBytes("fail me"); + private static final String UPSERT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')"; + private static final String UPSERT_SELECT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " select k, c from a_success_table"; + private static final String DELETE_TO_FAIL = "delete from " + TABLE_NAME_TO_FAIL + " where k='z'"; + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static String url; + private static Driver driver; + private static final Properties props = new Properties(); + + static { + props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10); + } + + @BeforeClass + public static void setupCluster() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + setUpConfigForMiniCluster(conf); + conf.setClass("hbase.coprocessor.region.classes", FailingRegionObserver.class, RegionObserver.class); + conf.setBoolean("hbase.coprocessor.abortonerror", false); + conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false); + TEST_UTIL.startMiniCluster(); + String clientPort = TEST_UTIL.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB); + url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort + + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM; + + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + // Must update config before starting server + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true)); + driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator())); + createTablesWithABitOfData(); + } + + private static void createTablesWithABitOfData() throws Exception { + Properties props = new Properties(); + props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10); + + try (Connection con = driver.connect(url, new Properties())) { + Statement sta = con.createStatement(); + sta.execute("create table a_success_table (k varchar primary key, c varchar)"); + sta.execute("create table b_failure_table (k varchar primary key, c varchar)"); + sta.execute("create table c_success_table (k varchar primary key, c varchar)"); + con.commit(); + } + + props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100); + + try (Connection con = driver.connect(url, new Properties())) { + con.setAutoCommit(false); + Statement sta = con.createStatement(); + for (String table : newHashSet("a_success_table", TABLE_NAME_TO_FAIL, "c_success_table")) { + sta.execute("upsert into " + table + " values ('z', 'z')"); + sta.execute("upsert into " + table + " values ('zz', 'zz')"); + sta.execute("upsert into " + table + " values ('zzz', 'zzz')"); + } + con.commit(); + } + } + + @AfterClass + public static void teardownCluster() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testNoFailure() { + testPartialCommit(singletonList("upsert into a_success_table values ('testNoFailure', 'a')"), 0, new int[0], false, + singletonList("select count(*) from a_success_table where k='testNoFailure'"), singletonList(new Integer(1))); + } + + @Test + public void testUpsertFailure() { + testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertFailure1', 'a')", + UPSERT_TO_FAIL, + "upsert into a_success_table values ('testUpsertFailure2', 'b')"), + 1, new int[]{1}, true, + newArrayList("select count(*) from a_success_table where k like 'testUpsertFailure_'", + "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), + newArrayList(new Integer(2), new Integer(0))); + } + + @Test + public void testUpsertSelectFailure() throws SQLException { + props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100); + + try (Connection con = driver.connect(url, new Properties())) { + con.createStatement().execute("upsert into a_success_table values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')"); + con.commit(); + } + + testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertSelectFailure', 'a')", + UPSERT_SELECT_TO_FAIL), + 1, new int[]{1}, true, + newArrayList("select count(*) from a_success_table where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL) + "')", + "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), + newArrayList(new Integer(2), new Integer(0))); + } + + @Test + public void testDeleteFailure() { + testPartialCommit(newArrayList("upsert into a_success_table values ('testDeleteFailure1', 'a')", + DELETE_TO_FAIL, + "upsert into a_success_table values ('testDeleteFailure2', 'b')"), + 1, new int[]{1}, true, + newArrayList("select count(*) from a_success_table where k like 'testDeleteFailure_'", + "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), + newArrayList(new Integer(2), new Integer(1))); + } + + /** + * {@link MutationState} keeps mutations ordered lexicographically by table name. + */ + @Test + public void testOrderOfMutationsIsPredicatable() { + testPartialCommit(newArrayList("upsert into c_success_table values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order + UPSERT_TO_FAIL, + "upsert into a_success_table values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order + 2, new int[]{0,1}, true, + newArrayList("select count(*) from c_success_table where k='testOrderOfMutationsIsPredicatable'", + "select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable'", + "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), + newArrayList(new Integer(0), new Integer(1), new Integer(0))); + } + + @Test + public void checkThatAllStatementTypesMaintainOrderInConnection() { + testPartialCommit(newArrayList("upsert into a_success_table values ('k', 'checkThatAllStatementTypesMaintainOrderInConnection')", + "upsert into a_success_table select k, c from c_success_table", + DELETE_TO_FAIL, + "select * from a_success_table", + UPSERT_TO_FAIL), + 2, new int[]{2,4}, true, + newArrayList("select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection + "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + ROW_TO_FAIL + "'", + "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), + newArrayList(new Integer(4), new Integer(0), new Integer(1))); + } + + private void testPartialCommit(List<String> statements, int failureCount, int[] expectedUncommittedStatementIndexes, boolean willFail, + List<String> countStatementsForVerification, List<Integer> expectedCountsForVerification) { + Preconditions.checkArgument(countStatementsForVerification.size() == expectedCountsForVerification.size()); + + try (Connection con = getConnectionWithTableOrderPreservingMutationState()) { + con.setAutoCommit(false); + Statement sta = con.createStatement(); + for (String statement : statements) { + sta.execute(statement); + } + try { + con.commit(); + if (willFail) { + fail("Expected at least one statement in the list to fail"); + } else { + assertEquals(0, con.unwrap(PhoenixConnection.class).getStatementExecutionCounter()); // should have been reset to 0 in commit() + } + } catch (SQLException sqle) { + if (!willFail) { + fail("Expected no statements to fail"); + } + assertEquals(CommitException.class, sqle.getClass()); + int[] uncommittedStatementIndexes = ((CommitException)sqle).getUncommittedStatementIndexes(); + assertEquals(failureCount, uncommittedStatementIndexes.length); + assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes); + } + + // verify data in HBase + for (int i = 0; i < countStatementsForVerification.size(); i++) { + String countStatement = countStatementsForVerification.get(i); + ResultSet rs = sta.executeQuery(countStatement); + if (!rs.next()) { + fail("Expected a single row from count query"); + } + assertEquals(expectedCountsForVerification.get(i).intValue(), rs.getInt(1)); + } + } catch (SQLException e) { + fail(e.toString()); + } + } + + private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException { + Connection con = driver.connect(url, new Properties()); + PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class)); + final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator()); + return new PhoenixConnection(phxCon) { + @Override + protected MutationState newMutationState(int maxSize) { - return new MutationState(maxSize, this, mutations); ++ return new MutationState(maxSize, this, mutations, null); + }; + }; + } + + public static class FailingRegionObserver extends SimpleRegionObserver { + @Override + public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, + final Durability durability) throws HBaseIOException { + if (shouldFailUpsert(c, put)) { + // throwing anything other than instances of IOException result + // in this coprocessor being unloaded + // DoNotRetryIOException tells HBase not to retry this mutation + // multiple times + throw new DoNotRetryIOException(); + } + } + + @Override + public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, + Delete delete, WALEdit edit, Durability durability) throws IOException { + if (shouldFailDelete(c, delete)) { + // throwing anything other than instances of IOException result + // in this coprocessor being unloaded + // DoNotRetryIOException tells HBase not to retry this mutation + // multiple times + throw new DoNotRetryIOException(); + } + } + + private boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) { + String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); + return TABLE_NAME_TO_FAIL.equals(tableName) && Bytes.equals(ROW_TO_FAIL, put.getRow()); + } + + private boolean shouldFailDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete) { + String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); + return TABLE_NAME_TO_FAIL.equals(tableName) && + // Phoenix deletes are sent as Mutations with empty values + delete.getFamilyCellMap().firstEntry().getValue().get(0).getValueLength() == 0 && + delete.getFamilyCellMap().firstEntry().getValue().get(0).getQualifierLength() == 0; + } + } + + /** + * Used for ordering {@link MutationState#mutations} map. + */ + private static class TableRefComparator implements Comparator<TableRef> { + @Override + public int compare(TableRef tr1, TableRef tr2) { + return tr1.getTable().getPhysicalName().getString().compareTo(tr2.getTable().getPhysicalName().getString()); + } + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java index 1cdd508,6b2309e..2396719 --- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java @@@ -312,11 -312,11 +312,11 @@@ public class EndToEndCoveredColumnsInde HTable primary = new HTable(UTIL.getConfiguration(), tableNameBytes); // overwrite the codec so we can verify the current state - HRegion region = UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0); + Region region = UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0); Indexer indexer = (Indexer) region.getCoprocessorHost().findCoprocessor(Indexer.class.getName()); - CoveredColumnsIndexBuilder builder = - (CoveredColumnsIndexBuilder) indexer.getBuilderForTesting(); + NonTxIndexBuilder builder = + (NonTxIndexBuilder) indexer.getBuilderForTesting(); VerifyingIndexCodec codec = new VerifyingIndexCodec(); builder.setIndexCodecForTesting(codec); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java index 0000000,138c75d..acf01dc mode 000000,100644..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateFunctionCompiler.java @@@ -1,0 -1,82 +1,72 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.phoenix.compile; + + import java.sql.ParameterMetaData; + import java.sql.SQLException; + import java.sql.SQLFeatureNotSupportedException; + import java.util.Collections; + + import org.apache.hadoop.hbase.client.Scan; + import org.apache.phoenix.execute.MutationState; + import org.apache.phoenix.jdbc.PhoenixConnection; + import org.apache.phoenix.jdbc.PhoenixStatement; + import org.apache.phoenix.parse.CreateFunctionStatement; + import org.apache.phoenix.schema.MetaDataClient; + + public class CreateFunctionCompiler { + + private final PhoenixStatement statement; + + public CreateFunctionCompiler(PhoenixStatement statement) { + this.statement = statement; + } + + public MutationPlan compile(final CreateFunctionStatement create) throws SQLException { + final PhoenixConnection connection = statement.getConnection(); + PhoenixConnection connectionToBe = connection; + final StatementContext context = new StatementContext(statement); + final MetaDataClient client = new MetaDataClient(connectionToBe); + - return new MutationPlan() { - - @Override - public ParameterMetaData getParameterMetaData() { - return context.getBindManager().getParameterMetaData(); - } ++ return new BaseMutationPlan(context, create.getOperation()) { + + @Override + public MutationState execute() throws SQLException { + try { + return client.createFunction(create); + } finally { + if (client.getConnection() != connection) { + client.getConnection().close(); + } + } + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return new ExplainPlan(Collections.singletonList("CREATE" + + (create.getFunctionInfo().isReplace() ? " OR REPLACE" : "") + + " FUNCTION")); + } + + @Override - public PhoenixConnection getConnection() { - return connection; - } - - @Override + public StatementContext getContext() { + return context; + } + }; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java index 49e9059,a5adc49..1ffd36e --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java @@@ -40,7 -41,7 +40,8 @@@ import org.apache.phoenix.expression.Ro import org.apache.phoenix.expression.visitor.StatelessTraverseNoExpressionVisitor; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; + import org.apache.phoenix.parse.BindParseNode; import org.apache.phoenix.parse.ColumnParseNode; import org.apache.phoenix.parse.CreateTableStatement; import org.apache.phoenix.parse.ParseNode; @@@ -62,12 -67,11 +67,13 @@@ import com.google.common.collect.Iterat public class CreateTableCompiler { + private static final PDatum VARBINARY_DATUM = new VarbinaryDatum(); private final PhoenixStatement statement; + private final Operation operation; - public CreateTableCompiler(PhoenixStatement statement) { + public CreateTableCompiler(PhoenixStatement statement, Operation operation) { this.statement = statement; + this.operation = operation; } public MutationPlan compile(final CreateTableStatement create) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index f0ba8e9,96588d1..bbc16b2 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@@ -1,5 -1,5 +1,4 @@@ /* -- * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@@ -90,15 -91,14 +92,16 @@@ public class DeleteCompiler private static ParseNodeFactory FACTORY = new ParseNodeFactory(); private final PhoenixStatement statement; + private final Operation operation; - public DeleteCompiler(PhoenixStatement statement) { + public DeleteCompiler(PhoenixStatement statement, Operation operation) { this.statement = statement; + this.operation = operation; } - private static MutationState deleteRows(PhoenixStatement statement, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException { + private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException { PTable table = targetTableRef.getTable(); + PhoenixStatement statement = childContext.getStatement(); PhoenixConnection connection = statement.getConnection(); PName tenantId = connection.getTenantId(); byte[] tenantIdBytes = null; @@@ -345,12 -331,12 +353,12 @@@ hint, false, aliasedNodes, delete.getWhere(), Collections.<ParseNode>emptyList(), null, delete.getOrderBy(), delete.getLimit(), - delete.getBindCount(), false, false); + delete.getBindCount(), false, false, Collections.<SelectStatement>emptyList(), delete.getUdfParseNodes()); - select = StatementNormalizer.normalize(select, resolver); - SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolver, connection); + select = StatementNormalizer.normalize(select, resolverToBe); + SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection); if (transformedSelect != select) { - resolver = FromCompiler.getResolverForQuery(transformedSelect, connection); - select = StatementNormalizer.normalize(transformedSelect, resolver); + resolverToBe = FromCompiler.getResolverForQuery(transformedSelect, connection); + select = StatementNormalizer.normalize(transformedSelect, resolverToBe); } parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection); QueryOptimizer optimizer = new QueryOptimizer(services); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index 855915d,dd92b00..a9dd616 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@@ -492,7 -651,7 +658,7 @@@ public class FromCompiler PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, null, null, columns, null, null, Collections.<PTable>emptyList(), false, Collections.<PName>emptyList(), null, null, false, false, false, null, -- null, null, false); ++ null, null, false, false); String alias = subselectNode.getAlias(); TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index daec761,e4ca5d9..b55e4aa --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@@ -1296,8 -1297,12 +1297,12 @@@ public class JoinCompiler } return PTableImpl.makePTable(left.getTenantId(), left.getSchemaName(), - PNameFactory.newName(SchemaUtil.getTableName(left.getName().getString(), right.getName().getString())), left.getType(), left.getIndexState(), left.getTimeStamp(), left.getSequenceNumber(), left.getPKName(), left.getBucketNum(), merged, - left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(), left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL, left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(), left.getIndexType(), left.isTransactional()); + PNameFactory.newName(SchemaUtil.getTableName(left.getName().getString(), right.getName().getString())), + left.getType(), left.getIndexState(), left.getTimeStamp(), left.getSequenceNumber(), left.getPKName(), + left.getBucketNum(), merged,left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(), + left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL, + left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(), left.getIndexType(), - left.rowKeyOrderOptimizable()); ++ left.rowKeyOrderOptimizable(), left.isTransactional()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java index 0000000,f93ab03..25bf35f mode 000000,100644..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java @@@ -1,0 -1,233 +1,245 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.phoenix.compile; + + import java.io.IOException; + import java.sql.ParameterMetaData; + import java.sql.SQLException; + import java.util.ArrayList; + import java.util.Collections; + import java.util.List; ++import java.util.Set; + + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.LocatedFileStatus; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.fs.RemoteIterator; + import org.apache.hadoop.hbase.Cell; + import org.apache.hadoop.hbase.CellUtil; + import org.apache.hadoop.hbase.HConstants; + import org.apache.hadoop.hbase.KeyValue.Type; + import org.apache.hadoop.hbase.client.Result; + import org.apache.hadoop.hbase.client.Scan; + import org.apache.hadoop.hbase.io.ImmutableBytesWritable; + import org.apache.phoenix.compile.GroupByCompiler.GroupBy; + import org.apache.phoenix.compile.OrderByCompiler.OrderBy; + import org.apache.phoenix.expression.Determinism; + import org.apache.phoenix.expression.Expression; + import org.apache.phoenix.expression.LiteralExpression; + import org.apache.phoenix.expression.RowKeyColumnExpression; + import org.apache.phoenix.iterate.DefaultParallelScanGrouper; + import org.apache.phoenix.iterate.ParallelScanGrouper; + import org.apache.phoenix.iterate.ResultIterator; + import org.apache.phoenix.jdbc.PhoenixParameterMetaData; + import org.apache.phoenix.jdbc.PhoenixStatement; ++import org.apache.phoenix.jdbc.PhoenixStatement.Operation; + import org.apache.phoenix.parse.FilterableStatement; + import org.apache.phoenix.parse.LiteralParseNode; + import org.apache.phoenix.parse.ParseNodeFactory; + import org.apache.phoenix.query.HBaseFactoryProvider; + import org.apache.phoenix.query.KeyRange; + import org.apache.phoenix.query.QueryServices; + import org.apache.phoenix.schema.PColumn; + import org.apache.phoenix.schema.PColumnImpl; + import org.apache.phoenix.schema.PNameFactory; + import org.apache.phoenix.schema.RowKeyValueAccessor; + import org.apache.phoenix.schema.SortOrder; + import org.apache.phoenix.schema.TableRef; + import org.apache.phoenix.schema.tuple.ResultTuple; + import org.apache.phoenix.schema.tuple.Tuple; + import org.apache.phoenix.schema.types.PVarchar; + import org.apache.phoenix.util.ByteUtil; + import org.apache.phoenix.util.SizedUtil; + + public class ListJarsQueryPlan implements QueryPlan { + + private PhoenixStatement stmt = null; + private StatementContext context = null; + private boolean first = true; + + private static final RowProjector JARS_PROJECTOR; + + static { + List<ExpressionProjector> projectedColumns = new ArrayList<ExpressionProjector>(); + PColumn column = + new PColumnImpl(PNameFactory.newName("jar_location"), null, + PVarchar.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null, + false, null, false); + List<PColumn> columns = new ArrayList<PColumn>(); + columns.add(column); + Expression expression = + new RowKeyColumnExpression(column, new RowKeyValueAccessor(columns, 0)); + projectedColumns.add(new ExpressionProjector("jar_location", "", expression, + true)); + int estimatedByteSize = SizedUtil.KEY_VALUE_SIZE; + JARS_PROJECTOR = new RowProjector(projectedColumns, estimatedByteSize, false); + } + + public ListJarsQueryPlan(PhoenixStatement stmt) { + this.stmt = stmt; + this.context = new StatementContext(stmt); + } + + @Override + public StatementContext getContext() { + return this.context; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA; + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return ExplainPlan.EMPTY_PLAN; + } + + @Override + public ResultIterator iterator() throws SQLException { + return iterator(DefaultParallelScanGrouper.getInstance()); + } + + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { + return new ResultIterator() { + private RemoteIterator<LocatedFileStatus> listFiles = null; + + @Override + public void close() throws SQLException { + + } + + @Override + public Tuple next() throws SQLException { + try { + if(first) { + String dynamicJarsDir = + stmt.getConnection().getQueryServices().getProps() + .get(QueryServices.DYNAMIC_JARS_DIR_KEY); + if(dynamicJarsDir == null) { + throw new SQLException(QueryServices.DYNAMIC_JARS_DIR_KEY + + " is not configured for the listing the jars."); + } + dynamicJarsDir = + dynamicJarsDir.endsWith("/") ? dynamicJarsDir : dynamicJarsDir + '/'; + Configuration conf = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); + Path dynamicJarsDirPath = new Path(dynamicJarsDir); + FileSystem fs = dynamicJarsDirPath.getFileSystem(conf); + listFiles = fs.listFiles(dynamicJarsDirPath, true); + first = false; + } + if(listFiles == null || !listFiles.hasNext()) return null; + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + ParseNodeFactory factory = new ParseNodeFactory(); + LiteralParseNode literal = + factory.literal(listFiles.next().getPath().toString()); + LiteralExpression expression = + LiteralExpression.newConstant(literal.getValue(), PVarchar.INSTANCE, + Determinism.ALWAYS); + expression.evaluate(null, ptr); + byte[] rowKey = ByteUtil.copyKeyBytesIfNecessary(ptr); + Cell cell = + CellUtil.createCell(rowKey, HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY, System.currentTimeMillis(), + Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); + List<Cell> cells = new ArrayList<Cell>(1); + cells.add(cell); + return new ResultTuple(Result.create(cells)); + } catch (IOException e) { + throw new SQLException(e); + } + } + + @Override + public void explain(List<String> planSteps) { + } + }; + } + + @Override + public long getEstimatedSize() { + return PVarchar.INSTANCE.getByteSize(); + } + + @Override + public TableRef getTableRef() { + return null; + } + + @Override + public RowProjector getProjector() { + return JARS_PROJECTOR; + } + + @Override + public Integer getLimit() { + return null; + } + + @Override + public OrderBy getOrderBy() { + return OrderBy.EMPTY_ORDER_BY; + } + + @Override + public GroupBy getGroupBy() { + return GroupBy.EMPTY_GROUP_BY; + } + + @Override + public List<KeyRange> getSplits() { + return Collections.emptyList(); + } + + @Override + public List<List<Scan>> getScans() { + return Collections.emptyList(); + } + + @Override + public FilterableStatement getStatement() { + return null; + } + + @Override + public boolean isDegenerate() { + return false; + } + + @Override + public boolean isRowKeyOrdered() { + return false; + } + + @Override + public boolean useRoundRobinIterator() { + return false; + } ++ ++ @Override ++ public Set<TableRef> getSourceRefs() { ++ return Collections.<TableRef>emptySet(); ++ } ++ ++ @Override ++ public Operation getOperation() { ++ return stmt.getUpdateOperation(); ++ } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java index 9a50067,630760c..6a53f80 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java @@@ -53,21 -53,34 +53,34 @@@ public abstract class MutatingParallelI /** * Method that does the actual mutation work */ - abstract protected MutationState mutate(ResultIterator iterator, PhoenixConnection connection) throws SQLException; + abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException; @Override - public PeekingResultIterator newIterator(ResultIterator iterator, Scan scan) throws SQLException { - final PhoenixConnection connection = new PhoenixConnection(this.connection); - MutationState state = mutate(iterator, connection); - public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName) throws SQLException { ++ public PeekingResultIterator newIterator(StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName) throws SQLException { + final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection); + + MutationState state = mutate(parentContext, iterator, clonedConnection); + long totalRowCount = state.getUpdateCount(); - if (connection.getAutoCommit()) { - connection.getMutationState().join(state); - connection.commit(); - ConnectionQueryServices services = connection.getQueryServices(); - int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); - state = new MutationState(maxSize, connection, totalRowCount); + if (clonedConnection.getAutoCommit()) { + clonedConnection.getMutationState().join(state); + clonedConnection.commit(); + ConnectionQueryServices services = clonedConnection.getQueryServices(); + int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + /* + * Everything that was mutated as part of the clonedConnection has been committed. However, we want to + * report the mutation work done using this clonedConnection as part of the overall mutation work of the + * parent connection. So we need to set those metrics in the empty mutation state so that they could be + * combined with the parent connection's mutation metrics (as part of combining mutation state) in the + * close() method of the iterator being returned. Don't combine the read metrics in parent context yet + * though because they are possibly being concurrently modified by other threads at this stage. Instead we + * will get hold of the read metrics when all the mutating iterators are done. + */ + state = MutationState.emptyMutationState(maxSize, clonedConnection); + state.getMutationMetricQueue().combineMetricQueues(clonedConnection.getMutationState().getMutationMetricQueue()); } final MutationState finalState = state; + byte[] value = PLong.INSTANCE.toBytes(totalRowCount); KeyValue keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length); final Tuple tuple = new SingleKeyValueTuple(keyValue); http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index 1f35b10,d75fe38..96b057d --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@@ -31,16 -32,18 +31,18 @@@ import org.apache.phoenix.execute.Aggre import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixParameterMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; + import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ColumnRef; ++import org.apache.phoenix.schema.FunctionNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; -import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; @@@ -79,31 -80,23 +81,47 @@@ public class PostDDLCompiler public MutationPlan compile(final List<TableRef> tableRefs, final byte[] emptyCF, final byte[] projectCF, final List<PColumn> deleteList, final long timestamp) throws SQLException { - - return new MutationPlan() { - - @Override - public PhoenixConnection getConnection() { - return connection; - } - - @Override - public ParameterMetaData getParameterMetaData() { - return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA; - } - - @Override - public ExplainPlan getExplainPlan() throws SQLException { - return ExplainPlan.EMPTY_PLAN; - } + PhoenixStatement statement = new PhoenixStatement(connection); + final StatementContext context = new StatementContext( + statement, + new ColumnResolver() { + + @Override + public List<TableRef> getTables() { + return tableRefs; + } + + @Override + public TableRef resolveTable(String schemaName, String tableName) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public ColumnRef resolveColumn(String schemaName, String tableName, String colName) + throws SQLException { + throw new UnsupportedOperationException(); + } ++ ++ @Override ++ public List<PFunction> getFunctions() { ++ return Collections.<PFunction>emptyList(); ++ } ++ ++ @Override ++ public PFunction resolveFunction(String functionName) ++ throws SQLException { ++ throw new FunctionNotFoundException(functionName); ++ } ++ ++ @Override ++ public boolean hasUDFs() { ++ return false; ++ } + + }, + scan, + new SequenceManager(statement)); + return new BaseMutationPlan(context, Operation.UPSERT /* FIXME */) { @Override public MutationState execute() throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java index 97b68a5,1c0c469..136ee21 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java @@@ -45,11 -47,12 +47,13 @@@ public interface QueryPlan extends Stat */ public ResultIterator iterator() throws SQLException; + public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException; + public long getEstimatedSize(); - // TODO: change once joins are supported + @Deprecated TableRef getTableRef(); + /** * Returns projector used to formulate resultSet row */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java index cd42fd1,50ec919..a9754b3 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java @@@ -37,10 -38,12 +39,12 @@@ import org.apache.phoenix.expression.De import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; + import org.apache.phoenix.iterate.DefaultParallelScanGrouper; + import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixParameterMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; import org.apache.phoenix.metrics.MetricInfo; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.LiteralParseNode; @@@ -104,11 -100,21 +106,16 @@@ public class TraceQueryPlan implements @Override public ParameterMetaData getParameterMetaData() { - return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA; - } - - @Override - public ExplainPlan getExplainPlan() throws SQLException { - return ExplainPlan.EMPTY_PLAN; + return context.getBindManager().getParameterMetaData(); } - + @Override public ResultIterator iterator() throws SQLException { + return iterator(DefaultParallelScanGrouper.getInstance()); + } + + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { final PhoenixConnection conn = stmt.getConnection(); if (conn.getTraceScope() == null && !traceStatement.isTraceOn()) { return ResultIterator.EMPTY_ITERATOR; @@@ -223,9 -233,9 +239,14 @@@ public boolean isRowKeyOrdered() { return false; } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return ExplainPlan.EMPTY_PLAN; + } + + @Override + public boolean useRoundRobinIterator() { + return false; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java index d15ee7f,c6aa546..551b05c --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java @@@ -151,7 -152,7 +152,7 @@@ public class TupleProjectionCompiler table.getBucketNum(), projectedColumns, table.getParentSchemaName(), table.getParentName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), - table.getIndexType(), table.isTransactional()); - table.getIndexType(), table.rowKeyOrderOptimizable()); ++ table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional()); } public static PTable createProjectedTable(TableRef tableRef, List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException { @@@ -178,7 -179,7 +179,7 @@@ retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, null, Collections.<PTable>emptyList(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), - null, table.isTransactional()); - null, table.rowKeyOrderOptimizable()); ++ null, table.rowKeyOrderOptimizable(), table.isTransactional()); } // For extracting column references from single select statement http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java index 0000000,afca97a..298303d mode 000000,100644..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java @@@ -1,0 -1,89 +1,89 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.phoenix.compile; + + import java.sql.SQLException; + import java.util.ArrayList; + import java.util.List; + + import org.apache.hadoop.hbase.HConstants; + import org.apache.phoenix.exception.SQLExceptionCode; + import org.apache.phoenix.exception.SQLExceptionInfo; + import org.apache.phoenix.expression.Expression; + import org.apache.phoenix.jdbc.PhoenixStatement; + import org.apache.phoenix.parse.AliasedNode; + import org.apache.phoenix.schema.PColumn; + import org.apache.phoenix.schema.PColumnImpl; + import org.apache.phoenix.schema.PName; + import org.apache.phoenix.schema.PNameFactory; + import org.apache.phoenix.schema.PTable; + import org.apache.phoenix.schema.PTableImpl; + import org.apache.phoenix.schema.PTableType; + import org.apache.phoenix.schema.TableRef; + import org.apache.phoenix.schema.types.PDataType; + + public class UnionCompiler { + private static final PName UNION_FAMILY_NAME = PNameFactory.newName("unionFamilyName"); + private static final PName UNION_SCHEMA_NAME = PNameFactory.newName("unionSchemaName"); + private static final PName UNION_TABLE_NAME = PNameFactory.newName("unionTableName"); + + public static List<QueryPlan> checkProjectionNumAndTypes(List<QueryPlan> selectPlans) throws SQLException { + QueryPlan plan = selectPlans.get(0); + int columnCount = plan.getProjector().getColumnCount(); + List<? extends ColumnProjector> projectors = plan.getProjector().getColumnProjectors(); + List<PDataType> selectTypes = new ArrayList<PDataType>(); + for (ColumnProjector pro : projectors) { + selectTypes.add(pro.getExpression().getDataType()); + } + + for (int i = 1; i < selectPlans.size(); i++) { + plan = selectPlans.get(i); + if (columnCount !=plan.getProjector().getColumnCount()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.SELECT_COLUMN_NUM_IN_UNIONALL_DIFFS).setMessage(".").build().buildException(); + } + List<? extends ColumnProjector> pros = plan.getProjector().getColumnProjectors(); + for (int j = 0; j < columnCount; j++) { + PDataType type = pros.get(j).getExpression().getDataType(); + if (!type.isCoercibleTo(selectTypes.get(j))) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.SELECT_COLUMN_TYPE_IN_UNIONALL_DIFFS).setMessage(".").build().buildException(); + } + } + } + return selectPlans; + } + + public static TableRef contructSchemaTable(PhoenixStatement statement, QueryPlan plan, List<AliasedNode> selectNodes) throws SQLException { + List<PColumn> projectedColumns = new ArrayList<PColumn>(); + for (int i=0; i< plan.getProjector().getColumnCount(); i++) { + ColumnProjector colProj = plan.getProjector().getColumnProjector(i); + Expression sourceExpression = colProj.getExpression(); + String name = selectNodes == null ? colProj.getName() : selectNodes.get(i).getAlias(); + PColumnImpl projectedColumn = new PColumnImpl(PNameFactory.newName(name), UNION_FAMILY_NAME, + sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(), + i, sourceExpression.getSortOrder(), 500, null, false, sourceExpression.toString(), false); + projectedColumns.add(projectedColumn); + } + Long scn = statement.getConnection().getSCN(); + PTable tempTable = PTableImpl.makePTable(statement.getConnection().getTenantId(), UNION_SCHEMA_NAME, UNION_TABLE_NAME, + PTableType.SUBQUERY, null, HConstants.LATEST_TIMESTAMP, scn == null ? HConstants.LATEST_TIMESTAMP : scn, null, null, + projectedColumns, null, null, null, - true, null, null, null, true, true, true, null, null, null, false); ++ true, null, null, null, true, true, true, null, null, null, false, false); + TableRef tableRef = new TableRef(null, tempTable, 0, false); + return tableRef; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/7d3b544a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 09db9c1,56087c0..6b30ed8 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@@ -205,13 -242,22 +243,24 @@@ public class UpsertCompiler } private final PhoenixStatement statement; + private final Operation operation; - public UpsertCompiler(PhoenixStatement statement) { + public UpsertCompiler(PhoenixStatement statement, Operation operation) { this.statement = statement; + this.operation = operation; } + private static LiteralParseNode getNodeForRowTimestampColumn(PColumn col) { + PDataType type = col.getDataType(); + long dummyValue = 0L; + if (type.isCoercibleTo(PTimestamp.INSTANCE)) { + return new LiteralParseNode(new Timestamp(dummyValue), PTimestamp.INSTANCE); + } else if (type == PLong.INSTANCE || type == PUnsignedLong.INSTANCE) { + return new LiteralParseNode(dummyValue, PLong.INSTANCE); + } + throw new IllegalArgumentException(); + } + public MutationPlan compile(UpsertStatement upsert) throws SQLException { final PhoenixConnection connection = statement.getConnection(); ConnectionQueryServices services = connection.getQueryServices(); @@@ -403,12 -463,12 +468,12 @@@ if (! (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) ) { // We can pipeline the upsert select instead of spooling everything to disk first, // if we don't have any post processing that's required. - parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe); + parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe, useServerTimestampToBe); // If we're in the else, then it's not an aggregate, distinct, limited, or sequence using query, // so we might be able to run it entirely on the server side. - // Can't run on same server for transactional data, as we need the row keys for the data - // that is being upserted for conflict detection purposes. - runOnServer = sameTable && isAutoCommit && !table.isTransactional() && !(table.isImmutableRows() && !table.getIndexes().isEmpty()); + // For a table with row timestamp column, we can't guarantee that the row key will reside in the + // region space managed by region servers. So we bail out on executing on server side. - runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && table.getRowTimestampColPos() == -1; ++ runOnServer = sameTable && isAutoCommit && !table.isTransactional() && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && table.getRowTimestampColPos() == -1; } // If we may be able to run on the server, add a hint that favors using the data table // if all else is equal.
