gjacoby126 commented on a change in pull request #801:
URL: https://github.com/apache/phoenix/pull/801#discussion_r446770594
##########
File path:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
##########
@@ -108,6 +109,12 @@ protected QueryPlan getQueryPlan(final JobContext context,
final Configuration c
scan.setAttribute(BaseScannerRegionObserver.INDEX_RETRY_VERIFY,
Bytes.toBytes(lastVerifyTimeValue));
scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_DISABLE_LOGGING_VERIFY_TYPE,
getDisableLoggingVerifyType(configuration).toBytes());
+ String shouldLogMaxLookbackOutput =
+
configuration.get(IndexRebuildRegionScanner.INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY);
Review comment:
Fixed
##########
File path:
phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
##########
@@ -1006,6 +1011,72 @@ public void testDisableOutputLogging() throws Exception {
}
}
+ @Test
+ public void testEnableOutputLoggingForMaxLookback() throws Exception {
+ //by default we don't log invalid or missing rows past max lookback
age to the
+ // PHOENIX_INDEX_TOOL table. Verify that we can flip that logging on
from the client-side
+ // using a system property (such as from the command line) and have it
log rows on the
+ // server-side
+ if (!mutable) {
+ return;
+ }
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName,
dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName,
indexTableName);
+
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+ injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ deleteAllRows(conn,
+
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
+ String stmString1 =
+ "CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP
INTEGER) ";
+ conn.createStatement().execute(stmString1);
+
+ injectEdge.incrementValue(1L);
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?,
?)", dataTableFullName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+ // insert two rows
+ IndexToolIT.upsertRow(stmt1, 1);
+ IndexToolIT.upsertRow(stmt1, 2);
+ conn.commit();
+ injectEdge.incrementValue(1L); //we have to increment time to see
our writes
+ //now create an index async so it won't have the two rows in the
base table
+
+ String stmtString2 =
+ String.format(
+ "CREATE INDEX %s ON %s (LPAD(UPPER(NAME,
'en_US'),8,'x')||'_xyz') ASYNC ",
+ indexTableName, dataTableFullName);
+ conn.createStatement().execute(stmtString2);
+ conn.commit();
+ injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+ deleteAllRows(conn,
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
+ getUtility().getConfiguration().
+
set(IndexRebuildRegionScanner.INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
"true");
Review comment:
Fixed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]