Repository: phoenix Updated Branches: refs/heads/master 45079c4ea -> dd3b7b6c0
http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 79c316a..730f754 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -124,16 +124,12 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Lists; - /** * - * JDBC Connection implementation of Phoenix. - * Currently the following are supported: - * - Statement - * - PreparedStatement - * The connection may only be used with the following options: - * - ResultSet.TYPE_FORWARD_ONLY - * - Connection.TRANSACTION_READ_COMMITTED + * JDBC Connection implementation of Phoenix. Currently the following are + * supported: - Statement - PreparedStatement The connection may only be used + * with the following options: - ResultSet.TYPE_FORWARD_ONLY - + * Connection.TRANSACTION_READ_COMMITTED * * * @since 0.1 @@ -147,14 +143,14 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea private final int mutateBatchSize; private final long mutateBatchSizeBytes; private final Long scn; - private final boolean replayMutations; + private final boolean buildingIndex; private MutationState mutationState; private List<PhoenixStatement> statements = new ArrayList<>(); private boolean isAutoFlush = false; private boolean isAutoCommit = false; private PMetaData metaData; private final PName tenantId; - private final String datePattern; + private final String datePattern; private final String timePattern; private final String timestampPattern; private int statementExecutionCounter; @@ -170,7 +166,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea private final LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue; private TableResultIteratorFactory tableResultIteratorFactory; private boolean isRunningUpgrade; - + static { Tracing.addTraceMetricsSource(); } @@ -181,8 +177,13 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea return props; } - public PhoenixConnection(PhoenixConnection connection, boolean isDescRowKeyOrderUpgrade, boolean isRunningUpgrade) throws SQLException { - this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.metaData, connection.getMutationState(), isDescRowKeyOrderUpgrade, isRunningUpgrade, connection.replayMutations); + public PhoenixConnection(PhoenixConnection connection, + boolean isDescRowKeyOrderUpgrade, boolean isRunningUpgrade) + throws SQLException { + this(connection.getQueryServices(), connection.getURL(), connection + .getClientInfo(), connection.metaData, connection + .getMutationState(), isDescRowKeyOrderUpgrade, + isRunningUpgrade, connection.buildingIndex); this.isAutoCommit = connection.isAutoCommit; this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; @@ -190,93 +191,140 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public PhoenixConnection(PhoenixConnection connection) throws SQLException { - this(connection, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade()); + this(connection, connection.isDescVarLengthRowKeyUpgrade(), connection + .isRunningUpgrade()); } - - public PhoenixConnection(PhoenixConnection connection, MutationState mutationState) throws SQLException { - this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache(), mutationState, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(), connection.replayMutations); + + public PhoenixConnection(PhoenixConnection connection, + MutationState mutationState) throws SQLException { + this(connection.getQueryServices(), connection.getURL(), connection + .getClientInfo(), connection.getMetaDataCache(), mutationState, + connection.isDescVarLengthRowKeyUpgrade(), connection + .isRunningUpgrade(), connection.buildingIndex); } - - public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException { + + public PhoenixConnection(PhoenixConnection connection, long scn) + throws SQLException { this(connection.getQueryServices(), connection, scn); } - - public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException { - this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.metaData, connection.getMutationState(), connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(), connection.replayMutations); + + public PhoenixConnection(ConnectionQueryServices services, + PhoenixConnection connection, long scn) throws SQLException { + this(services, connection.getURL(), newPropsWithSCN(scn, + connection.getClientInfo()), connection.metaData, connection + .getMutationState(), connection.isDescVarLengthRowKeyUpgrade(), + connection.isRunningUpgrade(), connection.buildingIndex); this.isAutoCommit = connection.isAutoCommit; this.isAutoFlush = connection.isAutoFlush; this.sampler = connection.sampler; this.statementExecutionCounter = connection.statementExecutionCounter; } - - public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException { + + public PhoenixConnection(ConnectionQueryServices services, String url, + Properties info, PMetaData metaData) throws SQLException { this(services, url, info, metaData, null, false, false, false); } - - public PhoenixConnection(PhoenixConnection connection, ConnectionQueryServices services, Properties info) throws SQLException { - this(services, connection.url, info, connection.metaData, null, connection.isDescVarLengthRowKeyUpgrade(), connection.isRunningUpgrade(), connection.replayMutations); + + public PhoenixConnection(PhoenixConnection connection, + ConnectionQueryServices services, Properties info) + throws SQLException { + this(services, connection.url, info, connection.metaData, null, + connection.isDescVarLengthRowKeyUpgrade(), connection + .isRunningUpgrade(), connection.buildingIndex); } - - private PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData, MutationState mutationState, boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade, boolean replayMutations) throws SQLException { + + private PhoenixConnection(ConnectionQueryServices services, String url, + Properties info, PMetaData metaData, MutationState mutationState, + boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade, + boolean buildingIndex) throws SQLException { GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment(); this.url = url; this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade; - // Filter user provided properties based on property policy, if provided. + // Filter user provided properties based on property policy, if + // provided. PropertyPolicyProvider.getPropertyPolicy().evaluate(info); // Copy so client cannot change - this.info = info == null ? new Properties() : PropertiesUtil.deepCopy(info); + this.info = info == null ? new Properties() : PropertiesUtil + .deepCopy(info); final PName tenantId = JDBCUtil.getTenantId(url, info); if (this.info.isEmpty() && tenantId == null) { this.services = services; } else { - // Create child services keyed by tenantId to track resource usage for + // Create child services keyed by tenantId to track resource usage + // for // a tenantId for all connections on this JVM. if (tenantId != null) { - services = services.getChildQueryServices(tenantId.getBytesPtr()); + services = services.getChildQueryServices(tenantId + .getBytesPtr()); } ReadOnlyProps currentProps = services.getProps(); - final ReadOnlyProps augmentedProps = currentProps.addAll(filterKnownNonProperties(this.info)); - this.services = augmentedProps == currentProps ? services : new DelegateConnectionQueryServices(services) { + final ReadOnlyProps augmentedProps = currentProps + .addAll(filterKnownNonProperties(this.info)); + this.services = augmentedProps == currentProps ? services + : new DelegateConnectionQueryServices(services) { @Override public ReadOnlyProps getProps() { return augmentedProps; } }; } - + Long scnParam = JDBCUtil.getCurrentSCN(url, this.info); checkScn(scnParam); - Long replayAtParam = JDBCUtil.getReplayAt(url, this.info); - checkReplayAt(replayAtParam); - checkScnAndReplayAtEquality(scnParam,replayAtParam); - - this.scn = scnParam != null ? scnParam : replayAtParam; - this.replayMutations = replayMutations || replayAtParam != null; - this.isAutoFlush = this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED) - && this.services.getProps().getBoolean(QueryServices.AUTO_FLUSH_ATTRIB, QueryServicesOptions.DEFAULT_AUTO_FLUSH) ; + Long buildIndexAtParam = JDBCUtil.getBuildIndexSCN(url, this.info); + checkBuildIndexAt(buildIndexAtParam); + checkScnAndBuildIndexAtEquality(scnParam, buildIndexAtParam); + + this.scn = scnParam != null ? scnParam : buildIndexAtParam; + this.buildingIndex = buildingIndex || buildIndexAtParam != null; + this.isAutoFlush = this.services.getProps().getBoolean( + QueryServices.TRANSACTIONS_ENABLED, + QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED) + && this.services.getProps().getBoolean( + QueryServices.AUTO_FLUSH_ATTRIB, + QueryServicesOptions.DEFAULT_AUTO_FLUSH); this.isAutoCommit = JDBCUtil.getAutoCommit( - url, this.info, + url, + this.info, this.services.getProps().getBoolean( QueryServices.AUTO_COMMIT_ATTRIB, QueryServicesOptions.DEFAULT_AUTO_COMMIT)); - this.consistency = JDBCUtil.getConsistencyLevel(url, this.info, this.services.getProps() - .get(QueryServices.CONSISTENCY_ATTRIB, - QueryServicesOptions.DEFAULT_CONSISTENCY_LEVEL)); - // currently we are not resolving schema set through property, so if schema doesn't exists ,connection will not fail + this.consistency = JDBCUtil.getConsistencyLevel( + url, + this.info, + this.services.getProps().get(QueryServices.CONSISTENCY_ATTRIB, + QueryServicesOptions.DEFAULT_CONSISTENCY_LEVEL)); + // currently we are not resolving schema set through property, so if + // schema doesn't exists ,connection will not fail // but queries may fail - this.schema = JDBCUtil.getSchema(url, this.info, - this.services.getProps().get(QueryServices.SCHEMA_ATTRIB, QueryServicesOptions.DEFAULT_SCHEMA)); + this.schema = JDBCUtil.getSchema( + url, + this.info, + this.services.getProps().get(QueryServices.SCHEMA_ATTRIB, + QueryServicesOptions.DEFAULT_SCHEMA)); this.tenantId = tenantId; - this.mutateBatchSize = JDBCUtil.getMutateBatchSize(url, this.info, this.services.getProps()); - this.mutateBatchSizeBytes = JDBCUtil.getMutateBatchSizeBytes(url, this.info, this.services.getProps()); - datePattern = this.services.getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT); - timePattern = this.services.getProps().get(QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT); - timestampPattern = this.services.getProps().get(QueryServices.TIMESTAMP_FORMAT_ATTRIB, DateUtil.DEFAULT_TIMESTAMP_FORMAT); - String numberPattern = this.services.getProps().get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT); - int maxSize = this.services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); - int maxSizeBytes = this.services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); + this.mutateBatchSize = JDBCUtil.getMutateBatchSize(url, this.info, + this.services.getProps()); + this.mutateBatchSizeBytes = JDBCUtil.getMutateBatchSizeBytes(url, + this.info, this.services.getProps()); + datePattern = this.services.getProps().get( + QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT); + timePattern = this.services.getProps().get( + QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT); + timestampPattern = this.services.getProps().get( + QueryServices.TIMESTAMP_FORMAT_ATTRIB, + DateUtil.DEFAULT_TIMESTAMP_FORMAT); + String numberPattern = this.services.getProps().get( + QueryServices.NUMBER_FORMAT_ATTRIB, + NumberUtil.DEFAULT_NUMBER_FORMAT); + int maxSize = this.services.getProps().getInt( + QueryServices.MAX_MUTATION_SIZE_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + int maxSizeBytes = this.services.getProps().getInt( + QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); Format dateFormat = DateUtil.getDateFormatter(datePattern); Format timeFormat = DateUtil.getDateFormatter(timePattern); Format timestampFormat = DateUtil.getDateFormatter(timestampPattern); @@ -286,28 +334,36 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea formatters.put(PUnsignedDate.INSTANCE, dateFormat); formatters.put(PUnsignedTime.INSTANCE, timeFormat); formatters.put(PUnsignedTimestamp.INSTANCE, timestampFormat); - formatters.put(PDecimal.INSTANCE, FunctionArgumentType.NUMERIC.getFormatter(numberPattern)); - // We do not limit the metaData on a connection less than the global one, + formatters.put(PDecimal.INSTANCE, + FunctionArgumentType.NUMERIC.getFormatter(numberPattern)); + // We do not limit the metaData on a connection less than the global + // one, // as there's not much that will be cached here. Pruner pruner = new Pruner() { @Override public boolean prune(PTable table) { - long maxTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; - return (table.getType() != PTableType.SYSTEM && - ( table.getTimeStamp() >= maxTimestamp || - (table.getTenantId()!=null && ! Objects.equal(tenantId, table.getTenantId())))); + long maxTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP + : scn; + return (table.getType() != PTableType.SYSTEM && (table + .getTimeStamp() >= maxTimestamp || (table.getTenantId() != null && !Objects + .equal(tenantId, table.getTenantId())))); } - + @Override public boolean prune(PFunction function) { - long maxTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; - return ( function.getTimeStamp() >= maxTimestamp || - (function.getTenantId()!=null && ! Objects.equal(tenantId, function.getTenantId()))); + long maxTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP + : scn; + return (function.getTimeStamp() >= maxTimestamp || (function + .getTenantId() != null && !Objects.equal(tenantId, + function.getTenantId()))); } }; - this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, this.services.getProps()); - this.mutationState = mutationState == null ? newMutationState(maxSize, maxSizeBytes) : new MutationState(mutationState); + this.isRequestLevelMetricsEnabled = JDBCUtil + .isCollectingRequestLevelMetricsEnabled(url, info, + this.services.getProps()); + this.mutationState = mutationState == null ? newMutationState(maxSize, + maxSizeBytes) : new MutationState(mutationState); this.metaData = metaData; this.metaData.pruneTables(pruner); this.metaData.pruneFunctions(pruner); @@ -321,22 +377,28 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea this.isRunningUpgrade = isRunningUpgrade; GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment(); } - + private static void checkScn(Long scnParam) throws SQLException { if (scnParam != null && scnParam < 0) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_SCN).build().buildException(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_SCN) + .build().buildException(); } } - private static void checkReplayAt(Long replayAtParam) throws SQLException { + private static void checkBuildIndexAt(Long replayAtParam) throws SQLException { if (replayAtParam != null && replayAtParam < 0) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_REPLAY_AT).build().buildException(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.INVALID_REPLAY_AT).build() + .buildException(); } } - private static void checkScnAndReplayAtEquality(Long scnParam, Long replayAt) throws SQLException { + private static void checkScnAndBuildIndexAtEquality(Long scnParam, Long replayAt) + throws SQLException { if (scnParam != null && replayAt != null && !scnParam.equals(replayAt)) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.UNEQUAL_SCN_AND_REPLAY_AT).build().buildException(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.UNEQUAL_SCN_AND_BUILD_INDEX_AT).build() + .buildException(); } } @@ -354,21 +416,22 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } private ImmutableMap<String, String> getImmutableCustomTracingAnnotations() { - Builder<String, String> result = ImmutableMap.builder(); - result.putAll(JDBCUtil.getAnnotations(url, info)); - if (getSCN() != null) { - result.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, getSCN().toString()); - } - if (getTenantId() != null) { - result.put(PhoenixRuntime.TENANT_ID_ATTRIB, getTenantId().getString()); - } - return result.build(); + Builder<String, String> result = ImmutableMap.builder(); + result.putAll(JDBCUtil.getAnnotations(url, info)); + if (getSCN() != null) { + result.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, getSCN().toString()); + } + if (getTenantId() != null) { + result.put(PhoenixRuntime.TENANT_ID_ATTRIB, getTenantId() + .getString()); + } + return result.build(); } public Sampler<?> getSampler() { return this.sampler; } - + public void setSampler(Sampler<?> sampler) throws SQLException { this.sampler = sampler; } @@ -377,7 +440,8 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea return customTracingAnnotations; } - public int executeStatements(Reader reader, List<Object> binds, PrintStream out) throws IOException, SQLException { + public int executeStatements(Reader reader, List<Object> binds, + PrintStream out) throws IOException, SQLException { int bindsOffset = 0; int nStatements = 0; PhoenixStatementParser parser = new PhoenixStatementParser(reader); @@ -386,9 +450,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea PhoenixPreparedStatement stmt = null; try { stmt = new PhoenixPreparedStatement(this, parser); - ParameterMetaData paramMetaData = stmt.getParameterMetaData(); + ParameterMetaData paramMetaData = stmt + .getParameterMetaData(); for (int i = 0; i < paramMetaData.getParameterCount(); i++) { - stmt.setObject(i+1, binds.get(bindsOffset+i)); + stmt.setObject(i + 1, binds.get(bindsOffset + i)); } long start = System.currentTimeMillis(); boolean isQuery = stmt.execute(); @@ -404,20 +469,30 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea ResultSetMetaData md = rs.getMetaData(); columnCount = md.getColumnCount(); for (int i = 1; i <= columnCount; i++) { - int displayWidth = md.getColumnDisplaySize(i); + int displayWidth = md + .getColumnDisplaySize(i); String label = md.getColumnLabel(i); if (md.isSigned(i)) { - out.print(displayWidth < label.length() ? label.substring(0,displayWidth) : Strings.padStart(label, displayWidth, ' ')); + out.print(displayWidth < label.length() ? label + .substring(0, displayWidth) + : Strings.padStart(label, + displayWidth, ' ')); out.print(' '); } else { - out.print(displayWidth < label.length() ? label.substring(0,displayWidth) : Strings.padEnd(md.getColumnLabel(i), displayWidth, ' ')); + out.print(displayWidth < label.length() ? label + .substring(0, displayWidth) + : Strings.padEnd( + md.getColumnLabel(i), + displayWidth, ' ')); out.print(' '); } } out.println(); for (int i = 1; i <= columnCount; i++) { - int displayWidth = md.getColumnDisplaySize(i); - out.print(Strings.padStart("", displayWidth,'-')); + int displayWidth = md + .getColumnDisplaySize(i); + out.print(Strings.padStart("", + displayWidth, '-')); out.print(' '); } out.println(); @@ -426,13 +501,19 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea if (out != null) { ResultSetMetaData md = rs.getMetaData(); for (int i = 1; i <= columnCount; i++) { - int displayWidth = md.getColumnDisplaySize(i); + int displayWidth = md + .getColumnDisplaySize(i); String value = rs.getString(i); - String valueString = value == null ? QueryConstants.NULL_DISPLAY_TEXT : value; + String valueString = value == null ? QueryConstants.NULL_DISPLAY_TEXT + : value; if (md.isSigned(i)) { - out.print(Strings.padStart(valueString, displayWidth, ' ')); + out.print(Strings.padStart( + valueString, displayWidth, + ' ')); } else { - out.print(Strings.padEnd(valueString, displayWidth, ' ')); + out.print(Strings.padEnd( + valueString, displayWidth, + ' ')); } out.print(' '); } @@ -440,10 +521,12 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } } while (rs.next()); } - } else if (out != null){ + } else if (out != null) { int updateCount = stmt.getUpdateCount(); if (updateCount >= 0) { - out.println((updateCount == 0 ? "no" : updateCount) + (updateCount == 1 ? " row " : " rows ") + stmt.getUpdateOperation().toString()); + out.println((updateCount == 0 ? "no" : updateCount) + + (updateCount == 1 ? " row " : " rows ") + + stmt.getUpdateOperation().toString()); } } bindsOffset += paramMetaData.getParameterCount(); @@ -464,59 +547,59 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea public @Nullable PName getTenantId() { return tenantId; } - + public Long getSCN() { return scn; } - - public boolean isReplayMutations() { - return replayMutations; + + public boolean isBuildingIndex() { + return buildingIndex; } - + public int getMutateBatchSize() { return mutateBatchSize; } - public long getMutateBatchSizeBytes(){ + public long getMutateBatchSizeBytes() { return mutateBatchSizeBytes; } public PMetaData getMetaDataCache() { return metaData; } - + public PTable getTable(PTableKey key) throws TableNotFoundException { - return metaData.getTableRef(key).getTable(); + return metaData.getTableRef(key).getTable(); } - + public PTableRef getTableRef(PTableKey key) throws TableNotFoundException { - return metaData.getTableRef(key); + return metaData.getTableRef(key); } protected MutationState newMutationState(int maxSize, int maxSizeBytes) { return new MutationState(maxSize, maxSizeBytes, this); } - + public MutationState getMutationState() { return mutationState; } - + public String getDatePattern() { return datePattern; } - + public Format getFormatter(PDataType type) { return formatters.get(type); } - + public String getURL() { return url; } - + public ConnectionQueryServices getQueryServices() { return services; } - + @Override public void clearWarnings() throws SQLException { } @@ -538,13 +621,15 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } } } - + private void checkOpen() throws SQLException { if (isClosed) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.CONNECTION_CLOSED).build().buildException(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CONNECTION_CLOSED).build() + .buildException(); } } - + @Override public void close() throws SQLException { if (isClosed) { @@ -580,10 +665,12 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } @Override - public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + public Array createArrayOf(String typeName, Object[] elements) + throws SQLException { checkOpen(); - PDataType arrayPrimitiveType = PDataType.fromSqlTypeName(typeName); - return PArrayDataType.instantiatePhoenixArray(arrayPrimitiveType, elements); + PDataType arrayPrimitiveType = PDataType.fromSqlTypeName(typeName); + return PArrayDataType.instantiatePhoenixArray(arrayPrimitiveType, + elements); } @Override @@ -609,7 +696,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea public List<PhoenixStatement> getStatements() { return statements; } - + @Override public Statement createStatement() throws SQLException { checkOpen(); @@ -620,28 +707,33 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea /** * Back-door way to inject processing into walking through a result set + * * @param statementFactory * @return PhoenixStatement * @throws SQLException */ - public PhoenixStatement createStatement(PhoenixStatementFactory statementFactory) throws SQLException { + public PhoenixStatement createStatement( + PhoenixStatementFactory statementFactory) throws SQLException { PhoenixStatement statement = statementFactory.newStatement(this); statements.add(statement); return statement; } @Override - public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + public Statement createStatement(int resultSetType, int resultSetConcurrency) + throws SQLException { checkOpen(); - if (resultSetType != ResultSet.TYPE_FORWARD_ONLY || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) { + if (resultSetType != ResultSet.TYPE_FORWARD_ONLY + || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) { throw new SQLFeatureNotSupportedException(); } return createStatement(); } @Override - public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) - throws SQLException { + public Statement createStatement(int resultSetType, + int resultSetConcurrency, int resultSetHoldability) + throws SQLException { checkOpen(); if (resultSetHoldability != ResultSet.CLOSE_CURSORS_AT_COMMIT) { throw new SQLFeatureNotSupportedException(); @@ -650,7 +742,8 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } @Override - public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + public Struct createStruct(String typeName, Object[] attributes) + throws SQLException { throw new SQLFeatureNotSupportedException(); } @@ -664,8 +757,12 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } public void setAutoFlush(boolean autoFlush) throws SQLException { - if (autoFlush && !this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH) + if (autoFlush + && !this.services.getProps().getBoolean( + QueryServices.TRANSACTIONS_ENABLED, + QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH) .build().buildException(); } this.isAutoFlush = autoFlush; @@ -674,20 +771,26 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea public void flush() throws SQLException { mutationState.sendUncommitted(); } - - public void setTransactionContext(PhoenixTransactionContext txContext) throws SQLException { - if (!this.services.getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT) + + public void setTransactionContext(PhoenixTransactionContext txContext) + throws SQLException { + if (!this.services.getProps().getBoolean( + QueryServices.TRANSACTIONS_ENABLED, + QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT) .build().buildException(); } this.mutationState.rollback(); - this.mutationState = new MutationState(this.mutationState.getMaxSize(), this.mutationState.getMaxSizeBytes(), this, txContext); - - // Write data to HBase after each statement execution as the commit may not + this.mutationState = new MutationState(this.mutationState.getMaxSize(), + this.mutationState.getMaxSizeBytes(), this, txContext); + + // Write data to HBase after each statement execution as the commit may + // not // come through Phoenix APIs. setAutoFlush(true); } - + public Consistency getConsistency() { return this.consistency; } @@ -698,7 +801,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } @Override - public Properties getClientInfo() throws SQLException { + public Properties getClientInfo() throws SQLException { // Defensive copy so client cannot change return new Properties(info); } @@ -721,10 +824,11 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea @Override public int getTransactionIsolation() throws SQLException { - boolean transactionsEnabled = getQueryServices().getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, + boolean transactionsEnabled = getQueryServices().getProps().getBoolean( + QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED); - return transactionsEnabled ? - Connection.TRANSACTION_REPEATABLE_READ : Connection.TRANSACTION_READ_COMMITTED; + return transactionsEnabled ? Connection.TRANSACTION_REPEATABLE_READ + : Connection.TRANSACTION_READ_COMMITTED; } @Override @@ -744,7 +848,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea @Override public boolean isReadOnly() throws SQLException { - return readOnly; + return readOnly || (scn != null && !buildingIndex && !isRunningUpgrade); } @Override @@ -764,58 +868,66 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } @Override - public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + public CallableStatement prepareCall(String sql, int resultSetType, + int resultSetConcurrency) throws SQLException { throw new SQLFeatureNotSupportedException(); } @Override - public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, - int resultSetHoldability) throws SQLException { + public CallableStatement prepareCall(String sql, int resultSetType, + int resultSetConcurrency, int resultSetHoldability) + throws SQLException { throw new SQLFeatureNotSupportedException(); } @Override public PreparedStatement prepareStatement(String sql) throws SQLException { checkOpen(); - PhoenixPreparedStatement statement = new PhoenixPreparedStatement(this, sql); + PhoenixPreparedStatement statement = new PhoenixPreparedStatement(this, + sql); statements.add(statement); return statement; } @Override - public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) + throws SQLException { checkOpen(); // Ignore autoGeneratedKeys, and just execute the statement. return prepareStatement(sql); } @Override - public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) + throws SQLException { checkOpen(); // Ignore columnIndexes, and just execute the statement. return prepareStatement(sql); } @Override - public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + public PreparedStatement prepareStatement(String sql, String[] columnNames) + throws SQLException { checkOpen(); // Ignore columnNames, and just execute the statement. return prepareStatement(sql); } @Override - public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) - throws SQLException { + public PreparedStatement prepareStatement(String sql, int resultSetType, + int resultSetConcurrency) throws SQLException { checkOpen(); - if (resultSetType != ResultSet.TYPE_FORWARD_ONLY || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) { + if (resultSetType != ResultSet.TYPE_FORWARD_ONLY + || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) { throw new SQLFeatureNotSupportedException(); } return prepareStatement(sql); } @Override - public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, - int resultSetHoldability) throws SQLException { + public PreparedStatement prepareStatement(String sql, int resultSetType, + int resultSetConcurrency, int resultSetHoldability) + throws SQLException { checkOpen(); if (resultSetHoldability != ResultSet.CLOSE_CURSORS_AT_COMMIT) { throw new SQLFeatureNotSupportedException(); @@ -864,20 +976,22 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea throw new SQLFeatureNotSupportedException(); } // TODO: -// if (catalog == null) { -// tenantId = null; -// } else { -// tenantId = PNameFactory.newName(catalog); -// } + // if (catalog == null) { + // tenantId = null; + // } else { + // tenantId = PNameFactory.newName(catalog); + // } } @Override - public void setClientInfo(Properties properties) throws SQLClientInfoException { + public void setClientInfo(Properties properties) + throws SQLClientInfoException { throw new UnsupportedOperationException(); } @Override - public void setClientInfo(String name, String value) throws SQLClientInfoException { + public void setClientInfo(String name, String value) + throws SQLClientInfoException { throw new UnsupportedOperationException(); } @@ -889,7 +1003,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea @Override public void setReadOnly(boolean readOnly) throws SQLException { checkOpen(); - this.readOnly=readOnly; + this.readOnly = readOnly; } @Override @@ -905,13 +1019,16 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea @Override public void setTransactionIsolation(int level) throws SQLException { checkOpen(); - boolean transactionsEnabled = getQueryServices().getProps().getBoolean(QueryServices.TRANSACTIONS_ENABLED, + boolean transactionsEnabled = getQueryServices().getProps().getBoolean( + QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED); if (level == Connection.TRANSACTION_SERIALIZABLE) { throw new SQLFeatureNotSupportedException(); } - if (!transactionsEnabled && level == Connection.TRANSACTION_REPEATABLE_READ) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_ISOLATION_LEVEL) + if (!transactionsEnabled + && level == Connection.TRANSACTION_REPEATABLE_READ) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_ISOLATION_LEVEL) .build().buildException(); } } @@ -930,11 +1047,14 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea @Override public <T> T unwrap(Class<T> iface) throws SQLException { if (!iface.isInstance(this)) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.CLASS_NOT_UNWRAPPABLE) - .setMessage(this.getClass().getName() + " not unwrappable from " + iface.getName()) - .build().buildException(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.CLASS_NOT_UNWRAPPABLE) + .setMessage( + this.getClass().getName() + + " not unwrappable from " + + iface.getName()).build().buildException(); } - return (T)this; + return (T) this; } @Override @@ -958,7 +1078,8 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } @Override - public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + public void setNetworkTimeout(Executor executor, int milliseconds) + throws SQLException { checkOpen(); } @@ -967,21 +1088,22 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea // TODO Auto-generated method stub return 0; } - + @Override public void addTable(PTable table, long resolvedTime) throws SQLException { metaData.addTable(table, resolvedTime); - //Cascade through to connectionQueryServices too + // Cascade through to connectionQueryServices too getQueryServices().addTable(table, resolvedTime); } - + @Override - public void updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException { - metaData.updateResolvedTimestamp(table, resolvedTime); - //Cascade through to connectionQueryServices too + public void updateResolvedTimestamp(PTable table, long resolvedTime) + throws SQLException { + metaData.updateResolvedTimestamp(table, resolvedTime); + // Cascade through to connectionQueryServices too getQueryServices().updateResolvedTimestamp(table, resolvedTime); } - + @Override public void addFunction(PFunction function) throws SQLException { // TODO: since a connection is only used by one thread at a time, @@ -989,7 +1111,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea if (scn == null || scn > function.getTimeStamp()) { metaData.addFunction(function); } - //Cascade through to connectionQueryServices too + // Cascade through to connectionQueryServices too getQueryServices().addFunction(function); } @@ -1001,46 +1123,57 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } @Override - public void removeTable(PName tenantId, String tableName, String parentTableName, long tableTimeStamp) throws SQLException { - metaData.removeTable(tenantId, tableName, parentTableName, tableTimeStamp); - //Cascade through to connectionQueryServices too - getQueryServices().removeTable(tenantId, tableName, parentTableName, tableTimeStamp); + public void removeTable(PName tenantId, String tableName, + String parentTableName, long tableTimeStamp) throws SQLException { + metaData.removeTable(tenantId, tableName, parentTableName, + tableTimeStamp); + // Cascade through to connectionQueryServices too + getQueryServices().removeTable(tenantId, tableName, parentTableName, + tableTimeStamp); } @Override - public void removeFunction(PName tenantId, String functionName, long tableTimeStamp) throws SQLException { + public void removeFunction(PName tenantId, String functionName, + long tableTimeStamp) throws SQLException { metaData.removeFunction(tenantId, functionName, tableTimeStamp); - //Cascade through to connectionQueryServices too - getQueryServices().removeFunction(tenantId, functionName, tableTimeStamp); + // Cascade through to connectionQueryServices too + getQueryServices().removeFunction(tenantId, functionName, + tableTimeStamp); } @Override - public void removeColumn(PName tenantId, String tableName, List<PColumn> columnsToRemove, long tableTimeStamp, + public void removeColumn(PName tenantId, String tableName, + List<PColumn> columnsToRemove, long tableTimeStamp, long tableSeqNum, long resolvedTime) throws SQLException { - metaData.removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); - //Cascade through to connectionQueryServices too - getQueryServices().removeColumn(tenantId, tableName, columnsToRemove, tableTimeStamp, tableSeqNum, resolvedTime); + metaData.removeColumn(tenantId, tableName, columnsToRemove, + tableTimeStamp, tableSeqNum, resolvedTime); + // Cascade through to connectionQueryServices too + getQueryServices().removeColumn(tenantId, tableName, columnsToRemove, + tableTimeStamp, tableSeqNum, resolvedTime); } - protected boolean removeStatement(PhoenixStatement statement) throws SQLException { + protected boolean removeStatement(PhoenixStatement statement) + throws SQLException { return statements.remove(statement); - } + } public KeyValueBuilder getKeyValueBuilder() { return this.services.getKeyValueBuilder(); } - + /** - * Used to track executions of {@link Statement}s and {@link PreparedStatement}s that were created from this connection before - * commit or rollback. 0-based. Used to associate partial save errors with SQL statements - * invoked by users. + * Used to track executions of {@link Statement}s and + * {@link PreparedStatement}s that were created from this connection before + * commit or rollback. 0-based. Used to associate partial save errors with + * SQL statements invoked by users. + * * @see CommitException * @see #incrementStatementExecutionCounter() */ public int getStatementExecutionCounter() { - return statementExecutionCounter; - } - + return statementExecutionCounter; + } + public void incrementStatementExecutionCounter() { statementExecutionCounter++; } @@ -1052,19 +1185,21 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea public void setTraceScope(TraceScope traceScope) { this.traceScope = traceScope; } - + public Map<String, Map<MetricType, Long>> getMutationMetrics() { return mutationState.getMutationMetricQueue().aggregate(); } - + public Map<String, Map<MetricType, Long>> getReadMetrics() { - return mutationState.getReadMetricQueue() != null ? mutationState.getReadMetricQueue().aggregate() : Collections.<String, Map<MetricType, Long>>emptyMap(); + return mutationState.getReadMetricQueue() != null ? mutationState + .getReadMetricQueue().aggregate() : Collections + .<String, Map<MetricType, Long>> emptyMap(); } - + public boolean isRequestLevelMetricsEnabled() { return isRequestLevelMetricsEnabled; } - + public void clearMetrics() { mutationState.getMutationMetricQueue().clearMetrics(); if (mutationState.getReadMetricQueue() != null) { @@ -1073,28 +1208,30 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea } /** - * Returns true if this connection is being used to upgrade the - * data due to PHOENIX-2067 and false otherwise. + * Returns true if this connection is being used to upgrade the data due to + * PHOENIX-2067 and false otherwise. + * * @return */ public boolean isDescVarLengthRowKeyUpgrade() { return isDescVarLengthRowKeyUpgrade; } - + /** * Added for tests only. Do not use this elsewhere. */ public ParallelIteratorFactory getIteratorFactory() { return parallelIteratorFactory; } - + /** * Added for testing purposes. Do not use this elsewhere. */ - public void setIteratorFactory(ParallelIteratorFactory parallelIteratorFactory) { + public void setIteratorFactory( + ParallelIteratorFactory parallelIteratorFactory) { this.parallelIteratorFactory = parallelIteratorFactory; } - + public void addIteratorForLeaseRenewal(@Nonnull TableResultIterator itr) { if (services.supportsFeature(Feature.RENEW_LEASE)) { checkNotNull(itr); @@ -1125,13 +1262,13 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea getQueryServices().removeSchema(schema, schemaTimeStamp); } - + public boolean isRunningUpgrade() { return isRunningUpgrade; } - + public void setRunningUpgrade(boolean isRunningUpgrade) { this.isRunningUpgrade = isRunningUpgrade; } - + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java index 25729d6..2871809 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java @@ -30,7 +30,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.RegionSizeCalculator; import org.apache.hadoop.io.NullWritable; @@ -46,7 +48,6 @@ import org.apache.phoenix.iterate.MapReduceParallelScanGrouper; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; -import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.util.PhoenixRuntime; http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java index c1db27c..eb4bc0e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java @@ -84,7 +84,7 @@ public class PhoenixIndexImportDirectMapper extends String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); String txScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE); if(txScnValue==null) { - overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn); + overrideProps.put(PhoenixRuntime.BUILD_INDEX_AT_ATTRIB, scn); } connection = ConnectionUtil.getOutputConnection(configuration, overrideProps); connection.setAutoCommit(false); http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java index 7551527..9e0d629 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java @@ -75,7 +75,7 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); String txScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE); if(txScnValue==null) { - overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn); + overrideProps.put(PhoenixRuntime.BUILD_INDEX_AT_ATTRIB, scn); } connection = ConnectionUtil.getOutputConnection(configuration,overrideProps); connection.setAutoCommit(false); http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java index 0ead358..5b85da5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java @@ -80,7 +80,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); String txScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE); if(txScnValue==null && scn!=null) { - overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn); + overrideProps.put(PhoenixRuntime.BUILD_INDEX_AT_ATTRIB, scn); } connection = ConnectionUtil.getOutputConnection(configuration, overrideProps).unwrap(PhoenixConnection.class); connection.setAutoCommit(false); http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 5ba134c..1a1e571 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -2412,6 +2412,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try (PhoenixConnection metaConnection = new PhoenixConnection(ConnectionQueryServicesImpl.this, globalUrl, scnProps, newEmptyMetaData())) { try { + metaConnection.setRunningUpgrade(true); metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA); } catch (NewerTableAlreadyExistsException ignore) { // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index ec05b24..f15e0b1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -294,6 +294,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB); String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB); metaConnection = new PhoenixConnection(this, globalUrl, scnProps, newEmptyMetaData()); + metaConnection.setRunningUpgrade(true); try { metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA); } catch (TableAlreadyExistsException ignore) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 557a149..ed1b3b2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -105,7 +105,6 @@ import static org.apache.phoenix.schema.types.PDataType.FALSE_BYTES; import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import java.io.IOException; -import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -1210,7 +1209,7 @@ public class MetaDataClient { // If our connection is at a fixed point-in-time, we need to open a new // connection so that our new index table is visible. Properties props = new Properties(connection.getClientInfo()); - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(connection.getSCN()+1)); + props.setProperty(PhoenixRuntime.BUILD_INDEX_AT_ATTRIB, Long.toString(connection.getSCN()+1)); PhoenixConnection conn = new PhoenixConnection(connection, connection.getQueryServices(), props); MetaDataClient newClientAtNextTimeStamp = new MetaDataClient(conn); @@ -2894,13 +2893,9 @@ public class MetaDataClient { } private void deleteFromStatsTable(List<TableRef> tableRefs, long ts) throws SQLException { - Properties props = new Properties(connection.getClientInfo()); - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); - Connection conn = new PhoenixConnection(connection.getQueryServices(), connection, ts); - conn.setAutoCommit(true); - boolean success = false; - SQLException sqlException = null; - try { + boolean isAutoCommit = connection.getAutoCommit(); + try { + connection.setAutoCommit(true); StringBuilder buf = new StringBuilder("DELETE FROM SYSTEM.STATS WHERE PHYSICAL_NAME IN ("); for (TableRef ref : tableRefs) { buf.append("'" + ref.getTable().getPhysicalName().getString() + "',"); @@ -2917,25 +2912,9 @@ public class MetaDataClient { } buf.setCharAt(buf.length() - 1, ')'); } - conn.createStatement().execute(buf.toString()); - success = true; - } catch (SQLException e) { - sqlException = e; + connection.createStatement().execute(buf.toString()); } finally { - try { - conn.close(); - } catch (SQLException e) { - if (sqlException == null) { - // If we're not in the middle of throwing another exception - // then throw the exception we got on close. - if (success) { - sqlException = e; - } - } else { - sqlException.setNextException(e); - } - } - if (sqlException != null) { throw sqlException; } + connection.setAutoCommit(isAutoCommit); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java index 7715705..e4f490a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/JDBCUtil.java @@ -133,8 +133,8 @@ public class JDBCUtil { return (scnStr == null ? null : Long.parseLong(scnStr)); } - public static Long getReplayAt(String url, Properties info) throws SQLException { - String scnStr = findProperty(url, info, PhoenixRuntime.REPLAY_AT_ATTRIB); + public static Long getBuildIndexSCN(String url, Properties info) throws SQLException { + String scnStr = findProperty(url, info, PhoenixRuntime.BUILD_INDEX_AT_ATTRIB); return (scnStr == null ? null : Long.parseLong(scnStr)); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index 59b3bb2..16ef206 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -96,7 +96,6 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Maps.EntryTransformer; /** * @@ -123,26 +122,18 @@ public class PhoenixRuntime { /** * Use this connection property to control HBase timestamps - * by specifying your own long timestamp value at connection time. All - * queries will use this as the upper bound of the time range for scans - * and DDL, and DML will use this as t he timestamp for key values. + * by specifying your own long timestamp value at connection time. + * Specifying this property will force the connection to be read + * only - no DML or DDL will be allowed. */ public static final String CURRENT_SCN_ATTRIB = "CurrentSCN"; /** - * Use this connection property to set the long time stamp value at - * which to replay DML statements after a write failure. The time - * stamp value must match the value returned by - * {@link org.apache.phoenix.execute.CommitException#getServerTimestamp()} - * when the exception occurred. Used in conjunction with the - * {@link org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy} - * index write failure policy to provide a means of the client replaying - * updates to ensure that secondary indexes are correctly caught up - * with any data updates when a write failure occurs. The updates - * should be replayed in ascending time stamp order. + * Internal connection property to force an index to be built at a + * given time stamp. */ - public static final String REPLAY_AT_ATTRIB = "ReplayAt"; - + public static final String BUILD_INDEX_AT_ATTRIB = "BuildIndexAt"; + /** * Use this connection property to help with fairness of resource allocation * for the client and server. The value of the attribute determines the http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java index 3f09a54..ca4be2f 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java @@ -82,6 +82,7 @@ import org.apache.phoenix.schema.types.PDecimal; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; @@ -2257,18 +2258,20 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { } @Test - public void testQueryWithSCN() throws Exception { + public void testDMLOfNonIndexWithBuildIndexAt() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(1000)); - props.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); + try (Connection conn = DriverManager.getConnection(getUrl(), props);) { + conn.createStatement().execute( + "CREATE TABLE t (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR)"); + } + props.put(PhoenixRuntime.BUILD_INDEX_AT_ATTRIB, Long.toString(EnvironmentEdgeManager.currentTimeMillis()+1)); try (Connection conn = DriverManager.getConnection(getUrl(), props);) { try { - conn.createStatement().execute( - "CREATE TABLE t (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true"); + conn.createStatement().execute("UPSERT INTO T (k,v1) SELECT k,v1 FROM T"); fail(); } catch (SQLException e) { assertEquals("Unexpected Exception", - SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET + SQLExceptionCode.ONLY_INDEX_UPDATABLE_AT_SCN .getErrorCode(), e.getErrorCode()); } } @@ -2778,21 +2781,6 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { } @Test - public void testSCNInOnDupKey() throws Exception { - String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=100"; - Connection conn = DriverManager.getConnection(url); - conn.createStatement().execute("CREATE TABLE t1 (k1 integer not null, k2 integer not null, v bigint, constraint pk primary key (k1,k2))"); - try { - conn.createStatement().execute("UPSERT INTO t1 VALUES(0,0) ON DUPLICATE KEY UPDATE v = v + 1"); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.CANNOT_SET_SCN_IN_ON_DUP_KEY.getErrorCode(), e.getErrorCode()); - } finally { - conn.close(); - } - } - - @Test public void testOrderPreservingGroupBy() throws Exception { try (Connection conn= DriverManager.getConnection(getUrl())) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/06f58d56/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 8b93b5c..1fe21d2 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -642,13 +642,7 @@ public class TestUtil { conn.createStatement().execute(query); } - public static void analyzeTable(String url, long ts, String tableName) throws IOException, SQLException { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); - analyzeTable(url, props, tableName); - } - - public static void analyzeTable(String url, Properties props, String tableName) throws IOException, SQLException { + public static void analyzeTable(String url, Properties props, String tableName) throws IOException, SQLException { Connection conn = DriverManager.getConnection(url, props); analyzeTable(conn, tableName); conn.close();
