Github user JamesRTaylor commented on a diff in the pull request: https://github.com/apache/phoenix/pull/129#discussion_r45307321 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java --- @@ -301,28 +476,81 @@ public void remove() { }; } + + private void generateMutations(final TableRef tableRef, long timestamp, + final Map<ImmutableBytesPtr, RowMutationState> values, + final List<Mutation> mutationList, + final List<Mutation> mutationsPertainingToIndex) { + final PTable table = tableRef.getTable(); + boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1; + Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> iterator = values.entrySet().iterator(); + long timestampToUse = timestamp; + while (iterator.hasNext()) { + Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next(); + ImmutableBytesPtr key = rowEntry.getKey(); + RowMutationState state = rowEntry.getValue(); + if (tableWithRowTimestampCol) { + RowTimestampColInfo rowTsColInfo = state.getRowTimestampColInfo(); + if (rowTsColInfo.useServerTimestamp()) { + // regenerate the key with this timestamp. + key = getNewRowKeyWithRowTimestamp(key, timestampToUse, table); + } else { + if (rowTsColInfo.getTimestamp() != null) { + timestampToUse = rowTsColInfo.getTimestamp(); + } + } + } + PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestampToUse, key); + List<Mutation> rowMutations, rowMutationsPertainingToIndex; + if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete + row.delete(); + rowMutations = row.toRowMutations(); + // Row deletes for index tables are processed by running a re-written query + // against the index table (as this allows for flexibility in being able to + // delete rows). + rowMutationsPertainingToIndex = Collections.emptyList(); + } else { + for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().getColumnValues().entrySet()) { + row.setValue(valueEntry.getKey(), valueEntry.getValue()); + } + rowMutations = row.toRowMutations(); + rowMutationsPertainingToIndex = rowMutations; + } + mutationList.addAll(rowMutations); + if (mutationsPertainingToIndex != null) mutationsPertainingToIndex.addAll(rowMutationsPertainingToIndex); + } + } /** * Get the unsorted list of HBase mutations for the tables with uncommitted data. * @return list of HBase mutations for uncommitted data. */ + public Iterator<Pair<byte[],List<Mutation>>> toMutations(Long timestamp) { + return toMutations(false, timestamp); + } + public Iterator<Pair<byte[],List<Mutation>>> toMutations() { - return toMutations(false); + return toMutations(false, null); } public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) { + return toMutations(includeMutableIndexes, null); + } + + public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) { final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); if (!iterator.hasNext()) { return Iterators.emptyIterator(); } Long scn = connection.getSCN(); - final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; + final long timestamp = (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn); +// final long timestamp = (scn == null ? HConstants.LATEST_TIMESTAMP : scn); --- End diff -- Remove commented out code
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---