priyankporwal commented on a change in pull request #517: PHOENIX-5211
Consistent Immutable Global Indexes for Non-Transactiona…
URL: https://github.com/apache/phoenix/pull/517#discussion_r294916527
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
##########
@@ -931,220 +951,298 @@ private void send(Iterator<TableRef> tableRefIterator)
throws SQLException {
joinMutationState(new TableRef(tableRef),
multiRowMutationState, txMutations);
}
}
- long serverTimestamp = HConstants.LATEST_TIMESTAMP;
- Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator =
physicalTableMutationMap.entrySet()
- .iterator();
- while (mutationsIterator.hasNext()) {
- Entry<TableInfo, List<Mutation>> pair =
mutationsIterator.next();
- TableInfo tableInfo = pair.getKey();
- byte[] htableName = tableInfo.getHTableName().getBytes();
- List<Mutation> mutationList = pair.getValue();
- List<List<Mutation>> mutationBatchList =
- getMutationBatchList(batchSize, batchSizeBytes,
mutationList);
-
- // create a span per target table
- // TODO maybe we can be smarter about the table name to string
here?
- Span child = Tracing.child(span, "Writing mutation batch for
table: " + Bytes.toString(htableName));
-
- int retryCount = 0;
- boolean shouldRetry = false;
- long numMutations = 0;
- long mutationSizeBytes = 0;
- long mutationCommitTime = 0;
- long numFailedMutations = 0;
- ;
- long startTime = 0;
- boolean shouldRetryIndexedMutation = false;
- IndexWriteException iwe = null;
- do {
- TableRef origTableRef = tableInfo.getOrigTableRef();
- PTable table = origTableRef.getTable();
- table.getIndexMaintainers(indexMetaDataPtr, connection);
- final ServerCache cache = tableInfo.isDataTable() ?
-
IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table,
- mutationList, indexMetaDataPtr) : null;
- // If we haven't retried yet, retry for this case only, as
it's possible that
- // a split will occur after we send the index metadata
cache to all known
- // region servers.
- shouldRetry = cache != null;
- SQLException sqlE = null;
- Table hTable =
connection.getQueryServices().getTable(htableName);
- try {
- if (table.isTransactional()) {
- // Track tables to which we've sent uncommitted
data
- if (tableInfo.isDataTable()) {
-
uncommittedPhysicalNames.add(table.getPhysicalName().getString());
- phoenixTransactionContext.markDMLFence(table);
- }
- // Only pass true for last argument if the index
is being written to on it's own (i.e. initial
- // index population), not if it's being written to
for normal maintenance due to writes to
- // the data table. This case is different because
the initial index population does not need
- // to be done transactionally since the index is
only made active after all writes have
- // occurred successfully.
- hTable =
phoenixTransactionContext.getTransactionalTableWriter(connection, table,
hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX);
+
+ Map<TableInfo, List<Mutation>> unverifiedIndexMutations = new
LinkedHashMap<>();
+ Map<TableInfo, List<Mutation>> verifiedOrDeletedIndexMutations =
new LinkedHashMap<>();
+ filterIndexCheckerMutations(physicalTableMutationMap,
unverifiedIndexMutations,
+ verifiedOrDeletedIndexMutations);
+
+ // Phase 1: Send verified indexes with VERIFIED=false
+ // addRowMutations generates the mutations with VERIFIED=false
Review comment:
Is the comment with "addRowMutations" relevant here? If it is, I see that
getting called from send() - so is it modifying the unverifiedIndexMutations
iterator?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services