This is an automated email from the ASF dual-hosted git repository. richardantal pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 282427d PHOENIX-6456 Support query logging for DDL and DML 282427d is described below commit 282427dd55aa0a7635863541b2ce859c1768d2d1 Author: Richard Antal <antal97rich...@gmail.com> AuthorDate: Thu Apr 29 20:16:08 2021 +0200 PHOENIX-6456 Support query logging for DDL and DML Change-Id: Ifd214f41cf2f5e8926e5d01792b4a09da9bd18e9 --- .../org/apache/phoenix/end2end/AuditLoggingIT.java | 248 +++++++++++++++++++++ .../org/apache/phoenix/jdbc/PhoenixConnection.java | 7 + .../apache/phoenix/jdbc/PhoenixEmbeddedDriver.java | 2 +- .../phoenix/jdbc/PhoenixPreparedStatement.java | 4 +- .../org/apache/phoenix/jdbc/PhoenixStatement.java | 105 +++++++-- .../org/apache/phoenix/log/AuditQueryLogger.java | 119 ++++++++++ ...andler.java => QueryLogDetailsWorkHandler.java} | 28 +-- .../java/org/apache/phoenix/log/QueryLogger.java | 10 +- .../apache/phoenix/log/QueryLoggerDisruptor.java | 25 ++- .../org/apache/phoenix/log/TableLogWriter.java | 4 + .../org/apache/phoenix/query/QueryServices.java | 2 + .../apache/phoenix/query/QueryServicesOptions.java | 1 + 12 files changed, 505 insertions(+), 50 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AuditLoggingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AuditLoggingIT.java new file mode 100644 index 0000000..e83ff90 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AuditLoggingIT.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.phoenix.log.LogLevel; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.junit.Test; + +import java.sql.*; +import java.util.Properties; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +public class AuditLoggingIT extends ParallelStatsDisabledIT { + + @Test + public void testEmptyLogging() throws Exception { + String createqQery = "create table test1 (mykey integer not null primary key," + + " mycolumn varchar)"; + String upsertQuery = "upsert into test1 values (1,'Hello')"; + String selectQuery = "select * from test1"; + String getLogsQuery = "select * from SYSTEM.LOG WHERE TABLE_NAME='TEST1' order by start_time"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + Statement stmt = conn.createStatement(); + stmt.execute(createqQery); + stmt.execute(upsertQuery); + stmt.executeQuery(selectQuery); + conn.commit(); + + ResultSet rs = stmt.executeQuery(getLogsQuery); + assertFalse(rs.next()); + } finally { + conn.close(); + } + } + + @Test + public void testLoggingSelect() throws Exception { + String createqQery = "create table test2 (mykey integer not null primary key," + + " mycolumn varchar)"; + String upsertQuery = "upsert into test2 values (1,'Hello')"; + String selectQuery = "select * from test2"; + String getLogsQuery = "select * from SYSTEM.LOG WHERE TABLE_NAME='TEST2' order by start_time"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.LOG_LEVEL, LogLevel.TRACE.name()); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + conn.createStatement().execute(createqQery); + conn.createStatement().execute(upsertQuery); + ResultSet rs = conn.createStatement().executeQuery(selectQuery); + assertTrue(rs.next()); + assertFalse(rs.next()); + rs.close(); + + ResultSet rs2 = conn.createStatement().executeQuery(getLogsQuery); + assertTrue(rs2.next()); + assertEquals(rs2.getString(7), selectQuery); + assertFalse(rs2.next()); + } finally { + conn.close(); + } + } + + @Test + public void testLoggingDMLAandDDL() throws Exception { + String createqQery = "create table test3 (mykey integer not null primary key," + + " mycolumn varchar)"; + String upsertQuery = "upsert into test3 values (1,'Hello')"; + String selectQuery = "select * from test3"; + String getLogsQuery = "select * from SYSTEM.LOG WHERE TABLE_NAME='TEST3' order by start_time"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.AUDIT_LOG_LEVEL, LogLevel.INFO.name()); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + conn.createStatement().execute(createqQery); + conn.createStatement().execute(upsertQuery); + ResultSet rs = conn.createStatement().executeQuery(selectQuery); + assertTrue(rs.next()); + assertFalse(rs.next()); + rs.close(); + + ResultSet rs2 = conn.createStatement().executeQuery(getLogsQuery); + assertTrue(rs2.next()); + assertEquals(rs2.getString(7), createqQery); + assertTrue(rs2.next()); + assertEquals(rs2.getString(7), upsertQuery); + + assertFalse(rs2.next()); + } finally { + conn.close(); + } + } + + @Test + public void testLoggingDMLAandDDLandSelect() throws Exception { + String createqQery = "create table test4 (mykey integer not null primary key," + + " mycolumn varchar)"; + String upsertQuery = "upsert into test4 values (1,'Hello')"; + String selectQuery = "select * from test4"; + String getLogsQuery = "select * from SYSTEM.LOG WHERE TABLE_NAME='TEST4' order by start_time"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.AUDIT_LOG_LEVEL, LogLevel.INFO.name()); + props.setProperty(QueryServices.LOG_LEVEL, LogLevel.TRACE.name()); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + Statement stat = conn.createStatement(); + stat.execute(createqQery); + stat.execute(upsertQuery); + ResultSet rs = stat.executeQuery(selectQuery); + assertTrue(rs.next()); + assertFalse(rs.next()); + rs.close(); + + ResultSet rs2 = conn.createStatement().executeQuery(getLogsQuery); + assertTrue(rs2.next()); + assertEquals(rs2.getString(7), createqQery); + assertTrue(rs2.next()); + assertEquals(rs2.getString(7), upsertQuery); + assertTrue(rs2.next()); + assertEquals(rs2.getString(7), selectQuery); + + assertFalse(rs2.next()); + + } finally { + conn.close(); + } + } + + @Test + public void testLogginParameterizedUpsert() throws Exception { + String createqQery = "create table test5 (mykey integer not null primary key," + + " mycolumn varchar)"; + String upsertQuery = "upsert into test5 values (?, ?)"; + String selectQuery = "select * from test5"; + String getLogsQuery = "select * from SYSTEM.LOG WHERE TABLE_NAME='TEST5' order by start_time"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.AUDIT_LOG_LEVEL, LogLevel.INFO.name()); + props.setProperty(QueryServices.LOG_LEVEL, LogLevel.TRACE.name()); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + Statement stat = conn.createStatement(); + stat.execute(createqQery); + + + PreparedStatement p = conn.prepareStatement(upsertQuery); + p.setInt(1, 1); + p.setString(2, "foo"); + + p.execute(); + + p.setInt(1, 2); + p.setString(2, "bar"); + + p.execute(); + + ResultSet rs = stat.executeQuery(selectQuery); + assertTrue(rs.next()); + assertTrue(rs.next()); + assertFalse(rs.next()); + rs.close(); + + ResultSet rs2 = conn.createStatement().executeQuery(getLogsQuery); + assertTrue(rs2.next()); + assertTrue(rs2.next()); + assertEquals("1,foo", rs2.getString(13)); + assertTrue(rs2.next()); + assertEquals( "2,bar", rs2.getString(13)); + assertTrue(rs2.next()); + assertFalse(rs2.next()); + + } finally { + conn.close(); + } + } + + @Test + public void testlogSamplingRate() throws Exception { + String createqQery = "create table test6 (mykey integer not null primary key," + + " mycolumn varchar)"; + + String selectQuery = "select * from test6"; + String getLogsQuery = "select * from SYSTEM.LOG WHERE TABLE_NAME='TEST6' order by start_time"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.AUDIT_LOG_LEVEL, LogLevel.INFO.name()); + props.setProperty(QueryServices.LOG_LEVEL, LogLevel.TRACE.name()); + props.setProperty(QueryServices.LOG_SAMPLE_RATE, "0.5"); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + try { + Statement stat = conn.createStatement(); + stat.execute(createqQery); + String upsertQuery; + for (int i = 0; i<100; i++) { + upsertQuery = "upsert into test6 values (" + i + ",'asd')"; + stat.execute(upsertQuery); + ResultSet rs = stat.executeQuery(selectQuery); + assertTrue(rs.next()); + rs.close(); + } + + ResultSet rs2 = conn.createStatement().executeQuery(getLogsQuery); + int numOfUpserts = 0; + int numOfSelects = 0; + while (rs2.next()) { + String query = rs2.getString(7); + if (query.equals(selectQuery)) { + numOfSelects++; + } + else if (query.contains("upsert into test6 values (")) { + numOfUpserts++; + } + } + assertEquals(numOfUpserts, 100); + assertTrue(numOfSelects > 0 && numOfSelects < 100); + System.out.println(numOfSelects); + + } finally { + conn.close(); + } + } + +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index ddc1dd3..f64c6b3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -172,6 +172,7 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea private TableResultIteratorFactory tableResultIteratorFactory; private boolean isRunningUpgrade; private LogLevel logLevel; + private LogLevel auditLogLevel; private Double logSamplingRate; private String sourceOfOperation; @@ -381,6 +382,8 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea }; this.logLevel= LogLevel.valueOf(this.services.getProps().get(QueryServices.LOG_LEVEL, QueryServicesOptions.DEFAULT_LOGGING_LEVEL)); + this.auditLogLevel= LogLevel.valueOf(this.services.getProps().get(QueryServices.AUDIT_LOG_LEVEL, + QueryServicesOptions.DEFAULT_AUDIT_LOGGING_LEVEL)); this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, this.services.getProps()); this.mutationState = mutationState == null ? newMutationState(maxSize, @@ -1358,6 +1361,10 @@ public class PhoenixConnection implements Connection, MetaDataMutated, SQLClosea public LogLevel getLogLevel(){ return this.logLevel; } + + public LogLevel getAuditLogLevel(){ + return this.auditLogLevel; + } public Double getLogSamplingRate(){ return this.logSamplingRate; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java index 5e26121..6b6c610 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java @@ -347,7 +347,7 @@ public abstract class PhoenixEmbeddedDriver implements Driver, SQLCloseable { } if(principal == null){ if (!isConnectionless) { - principal = props.get(QueryServices.HBASE_CLIENT_PRINCIPAL); + principal = props.get(QueryServices.HBASE_CLIENT_PRINCIPAL); } } if(keytab == null){ diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java index b54efc8..59d8add 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java @@ -170,7 +170,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar .build().buildException(); } if (statement.getOperation().isMutation()) { - executeMutation(statement); + executeMutation(statement, createAuditQueryLogger(statement,query)); return false; } executeQuery(statement, createQueryLogger(statement,query)); @@ -203,7 +203,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH) .build().buildException(); } - return executeMutation(statement); + return executeMutation(statement, createAuditQueryLogger(statement,query)); } public QueryPlan optimizeQuery() throws SQLException { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index e39393e..3d748e8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -89,6 +89,7 @@ import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.iterate.MaterializedResultIterator; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.log.AuditQueryLogger; import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.log.QueryLogInfo; import org.apache.phoenix.log.QueryLogger; @@ -380,12 +381,37 @@ public class PhoenixStatement implements Statement, SQLCloseable { throw new IllegalStateException(); // Can't happen as Throwables.propagate() always throws } } - - protected int executeMutation(final CompilableStatement stmt) throws SQLException { - return executeMutation(stmt, true); + + public String getTargetForAudit(CompilableStatement stmt) { + String target = null; + try { + if (stmt instanceof ExecutableUpsertStatement) { + return ((ExecutableUpsertStatement) stmt).getTable().getName().toString(); + } else if (stmt instanceof ExecutableDeleteStatement) { + return ((ExecutableDeleteStatement) stmt).getTable().getName().toString(); + } else if (stmt instanceof ExecutableCreateTableStatement) { + target = ((ExecutableCreateTableStatement)stmt).getTableName().toString(); + } else if (stmt instanceof ExecutableDropTableStatement) { + target = ((ExecutableDropTableStatement)stmt).getTableName().toString(); + } else if (stmt instanceof ExecutableAddColumnStatement) { + target = ((ExecutableAddColumnStatement)stmt).getTable().getName().toString(); + } else if (stmt instanceof ExecutableCreateSchemaStatement) { + return ((ExecutableCreateSchemaStatement) stmt).getSchemaName(); + } else if (stmt instanceof ExecutableDropSchemaStatement) { + target = ((ExecutableDropSchemaStatement)stmt).getSchemaName(); + } + } catch (Exception e) { + target = stmt.getClass().getName(); + } + return target; + } + + + protected int executeMutation(final CompilableStatement stmt, final AuditQueryLogger queryLogger) throws SQLException { + return executeMutation(stmt, true, queryLogger); } - private int executeMutation(final CompilableStatement stmt, final boolean doRetryOnMetaNotFoundError) throws SQLException { + private int executeMutation(final CompilableStatement stmt, final boolean doRetryOnMetaNotFoundError, final AuditQueryLogger queryLogger) throws SQLException { if (connection.isReadOnly()) { throw new SQLExceptionInfo.Builder( SQLExceptionCode.READ_ONLY_CONNECTION). @@ -426,6 +452,13 @@ public class PhoenixStatement implements Statement, SQLCloseable { setLastUpdateCount(lastUpdateCount); setLastUpdateOperation(stmt.getOperation()); connection.incrementStatementExecutionCounter(); + if(queryLogger.isAuditLoggingEnabled()) { + queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt)); + queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.COMPLETED.toString()); + queryLogger.log(QueryLogInfo.NO_OF_RESULTS_ITERATED_I, lastUpdateCount); + queryLogger.syncAudit(); + } + return lastUpdateCount; } //Force update cache and retry if meta not found error occurs @@ -436,7 +469,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { +" data from server"); if(new MetaDataClient(connection).updateCache(connection.getTenantId(), e.getSchemaName(), e.getTableName(), true).wasUpdated()){ - return executeMutation(stmt, false); + return executeMutation(stmt, false, queryLogger); } } throw e; @@ -452,6 +485,12 @@ public class PhoenixStatement implements Statement, SQLCloseable { }, PhoenixContextExecutor.inContext(), Tracing.withTracing(connection, this.toString())); } catch (Exception e) { + if(queryLogger.isAuditLoggingEnabled()) { + queryLogger.log(QueryLogInfo.TABLE_NAME_I, getTargetForAudit(stmt)); + queryLogger.log(QueryLogInfo.EXCEPTION_TRACE_I, Throwables.getStackTraceAsString(e)); + queryLogger.log(QueryLogInfo.QUERY_STATUS_I, QueryStatus.FAILED.toString()); + queryLogger.syncAudit(); + } Throwables.propagateIfInstanceOf(e, SQLException.class); Throwables.propagate(e); throw new IllegalStateException(); // Can't happen as Throwables.propagate() always throws @@ -1856,25 +1895,47 @@ public class PhoenixStatement implements Statement, SQLCloseable { return compileMutation(stmt, sql); } + public boolean isSystemTable(CompilableStatement stmt) { + boolean systemTable = false; + TableName tableName = null; + if (stmt instanceof ExecutableSelectStatement) { + TableNode from = ((ExecutableSelectStatement)stmt).getFrom(); + if(from instanceof NamedTableNode) { + tableName = ((NamedTableNode)from).getName(); + } + } else if (stmt instanceof ExecutableUpsertStatement) { + tableName = ((ExecutableUpsertStatement)stmt).getTable().getName(); + } else if (stmt instanceof ExecutableDeleteStatement) { + tableName = ((ExecutableDeleteStatement)stmt).getTable().getName(); + } else if (stmt instanceof ExecutableAddColumnStatement) { + tableName = ((ExecutableAddColumnStatement)stmt).getTable().getName(); + } + + if (tableName != null && PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA + .equals(tableName.getSchemaName())) { + systemTable = true; + } + + return systemTable; + } + public QueryLogger createQueryLogger(CompilableStatement stmt, String sql) throws SQLException { if (connection.getLogLevel() == LogLevel.OFF) { return QueryLogger.NO_OP_INSTANCE; } - boolean isSystemTable=false; - if(stmt instanceof ExecutableSelectStatement){ - TableNode from = ((ExecutableSelectStatement)stmt).getFrom(); - if(from instanceof NamedTableNode){ - String schemaName = ((NamedTableNode)from).getName().getSchemaName(); - if(schemaName==null){ - schemaName=connection.getSchema(); - } - if(PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals(schemaName)){ - isSystemTable=true; - } - } + QueryLogger queryLogger = QueryLogger.getInstance(connection, isSystemTable(stmt)); + QueryLoggerUtil.logInitialDetails(queryLogger, connection.getTenantId(), + connection.getQueryServices(), sql, getParameters()); + return queryLogger; + } + + public AuditQueryLogger createAuditQueryLogger(CompilableStatement stmt, String sql) throws SQLException { + if (connection.getAuditLogLevel() == LogLevel.OFF) { + return AuditQueryLogger.NO_OP_INSTANCE; } - QueryLogger queryLogger = QueryLogger.getInstance(connection,isSystemTable); + + AuditQueryLogger queryLogger = AuditQueryLogger.getInstance(connection, isSystemTable(stmt)); QueryLoggerUtil.logInitialDetails(queryLogger, connection.getTenantId(), connection.getQueryServices(), sql, getParameters()); return queryLogger; @@ -1891,7 +1952,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { if (stmt.getOperation().isMutation()) { throw new ExecuteQueryNotApplicableException(sql); } - return executeQuery(stmt,createQueryLogger(stmt,sql)); + return executeQuery(stmt, createQueryLogger(stmt, sql)); } @Override @@ -1904,7 +1965,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH) .build().buildException(); } - int updateCount = executeMutation(stmt); + int updateCount = executeMutation(stmt, createAuditQueryLogger(stmt, sql)); flushIfNecessary(); return updateCount; } @@ -1923,12 +1984,12 @@ public class PhoenixStatement implements Statement, SQLCloseable { throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH) .build().buildException(); } - executeMutation(stmt); + executeMutation(stmt, createAuditQueryLogger(stmt, sql)); flushIfNecessary(); return false; } - executeQuery(stmt,createQueryLogger(stmt,sql)); + executeQuery(stmt, createQueryLogger(stmt, sql)); return true; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/AuditQueryLogger.java b/phoenix-core/src/main/java/org/apache/phoenix/log/AuditQueryLogger.java new file mode 100644 index 0000000..8e4fc51 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/AuditQueryLogger.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.log; + +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.monitoring.MetricType; + +import java.util.Map; + + +/* + * Wrapper for query translator + */ +public class AuditQueryLogger extends QueryLogger { + private LogLevel auditLogLevel; + + + private AuditQueryLogger(PhoenixConnection connection) { + super(connection); + auditLogLevel = connection.getAuditLogLevel(); + + } + + private AuditQueryLogger() { + super(); + auditLogLevel = LogLevel.OFF; + } + + public static final AuditQueryLogger NO_OP_INSTANCE = new AuditQueryLogger() { + @Override + public void log(QueryLogInfo queryLogInfo, Object info) { + + } + + @Override + public boolean isDebugEnabled() { + return false; + } + + @Override + public boolean isInfoEnabled() { + return false; + } + + @Override + public void sync( + Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics) { + + } + + @Override + public void syncAudit( + Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics) { + + } + + @Override + public boolean isSynced(){ + return true; + } + }; + + public static AuditQueryLogger getInstance(PhoenixConnection connection, boolean isSystemTable) { + if (connection.getAuditLogLevel() == LogLevel.OFF || isSystemTable) { + return NO_OP_INSTANCE; + } + return new AuditQueryLogger(connection); + } + + + /** + * Is audit logging currently enabled? + * Call this method to prevent having to perform expensive operations (for example, + * String concatenation) when the audit log level is more than info. + */ + public boolean isAuditLoggingEnabled(){ + return isAuditLevelEnabled(LogLevel.INFO); + } + + private boolean isAuditLevelEnabled(LogLevel logLevel){ + return this.auditLogLevel != null && logLevel != LogLevel.OFF ? logLevel.ordinal() <= this.auditLogLevel.ordinal() + : false; + } + + + + public void sync(Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics) { + syncBase(readMetrics, overAllMetrics, auditLogLevel); + } + + public void syncAudit() { + syncAudit(null, null); + } + + /** + * We force LogLevel.TRACE here because in QueryLogInfo the minimum LogLevel for + * TABLE_NAME_I is Debug and for BIND_PARAMETERS_I is TRACE and we would like to see + * these parameters even in INFO level when using DDL and DML operations. + */ + public void syncAudit(Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics) { + syncBase(readMetrics, overAllMetrics, LogLevel.TRACE); + } + +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsWorkHandler.java similarity index 65% rename from phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java rename to phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsWorkHandler.java index ee6b2d6..82d30a2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsEventHandler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogDetailsWorkHandler.java @@ -17,36 +17,29 @@ */ package org.apache.phoenix.log; -import java.sql.SQLException; - +import com.lmax.disruptor.LifecycleAware; +import com.lmax.disruptor.WorkHandler; import org.apache.hadoop.conf.Configuration; -import com.lmax.disruptor.LifecycleAware; -import com.lmax.disruptor.Sequence; -import com.lmax.disruptor.SequenceReportingEventHandler; + +public class QueryLogDetailsWorkHandler implements WorkHandler<RingBufferEvent>, LifecycleAware { -public class QueryLogDetailsEventHandler implements SequenceReportingEventHandler<RingBufferEvent>, LifecycleAware { - private Sequence sequenceCallback; private LogWriter logWriter; - public QueryLogDetailsEventHandler(Configuration configuration) throws SQLException{ + public QueryLogDetailsWorkHandler(Configuration configuration) { this.logWriter = new TableLogWriter(configuration); } - - @Override - public void setSequenceCallback(final Sequence sequenceCallback) { - this.sequenceCallback = sequenceCallback; - } @Override - public void onEvent(final RingBufferEvent event, final long sequence, final boolean endOfBatch) throws Exception { - logWriter.write(event); - event.clear(); + public void onEvent(RingBufferEvent ringBufferEvent) throws Exception { + logWriter.write(ringBufferEvent); + ringBufferEvent.clear(); } @Override public void onStart() { + } @Override @@ -59,5 +52,4 @@ public class QueryLogDetailsEventHandler implements SequenceReportingEventHandle //Ignore } } - -} +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java index 27d4ba4..bb7d270 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLogger.java @@ -43,15 +43,15 @@ public class QueryLogger { private boolean isSynced; private static final Logger LOGGER = LoggerFactory.getLogger(QueryLogger.class); - private QueryLogger(PhoenixConnection connection) { + protected QueryLogger(PhoenixConnection connection) { this.queryId = UUID.randomUUID().toString(); this.queryDisruptor = connection.getQueryServices().getQueryDisruptor(); logLevel = connection.getLogLevel(); log(QueryLogInfo.QUERY_ID_I, queryId); log(QueryLogInfo.START_TIME_I, EnvironmentEdgeManager.currentTimeMillis()); } - - private QueryLogger() { + + protected QueryLogger() { logLevel = LogLevel.OFF; } @@ -155,6 +155,10 @@ public class QueryLogger { public void sync(Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics) { + syncBase(readMetrics, overAllMetrics, logLevel); + } + + public void syncBase(Map<String, Map<MetricType, Long>> readMetrics, Map<MetricType, Long> overAllMetrics, LogLevel logLevel) { if (!isSynced) { isSynced = true; final RingBufferEventTranslator translator = getCachedTranslator(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java index 1d3ebc9..03df4cc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/QueryLoggerDisruptor.java @@ -23,7 +23,6 @@ import java.sql.SQLException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.query.QueryServices; import org.slf4j.Logger; @@ -46,6 +45,7 @@ public class QueryLoggerDisruptor implements Closeable{ private static final int RING_BUFFER_SIZE = 8 * 1024; private static final Logger LOGGER = LoggerFactory.getLogger(QueryLoggerDisruptor.class); private static final String DEFAULT_WAIT_STRATEGY = BlockingWaitStrategy.class.getName(); + private static final int DEFAULT_AUDIT_LOGGER_PROCESS_COUNT = 1; public QueryLoggerDisruptor(Configuration configuration) throws SQLException{ WaitStrategy waitStrategy; @@ -74,12 +74,29 @@ public class QueryLoggerDisruptor implements Closeable{ final ExceptionHandler<RingBufferEvent> errorHandler = new QueryLoggerDefaultExceptionHandler(); disruptor.setDefaultExceptionHandler(errorHandler); - final QueryLogDetailsEventHandler[] handlers = { new QueryLogDetailsEventHandler(configuration) }; - disruptor.handleEventsWith(handlers); + /** + * if LOG_HANDLER_COUNT is 1 it will work as the previous implementation + * if LOG_HANDLER_COUNT is 2 or more then Multi Thread + */ + int handlerCount = configuration.getInt( + QueryServices.LOG_HANDLER_COUNT, DEFAULT_AUDIT_LOGGER_PROCESS_COUNT); + + if (handlerCount <= 0){ + LOGGER.error("Audit Log Handler Count must be greater than 0." + + "change to default value, input : " + handlerCount); + handlerCount = DEFAULT_AUDIT_LOGGER_PROCESS_COUNT; + } + + QueryLogDetailsWorkHandler[] workHandlers = new QueryLogDetailsWorkHandler[handlerCount]; + for (int i = 0; i < handlerCount; i++){ + workHandlers[i] = new QueryLogDetailsWorkHandler(configuration); + } + disruptor.handleEventsWithWorkerPool(workHandlers); + LOGGER.info("Starting QueryLoggerDisruptor for with ringbufferSize=" + disruptor.getRingBuffer().getBufferSize() + ", waitStrategy=" + waitStrategy.getClass().getSimpleName() + ", " + "exceptionHandler=" - + errorHandler + "..."); + + errorHandler + ", handlerCount=" + handlerCount); disruptor.start(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java index fbc6b2d..f325666 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/log/TableLogWriter.java @@ -96,6 +96,10 @@ public class TableLogWriter implements LogWriter { } } + if (connection.isReadOnly()) { + return; + } + ImmutableMap<QueryLogInfo, Object> queryInfoMap = event.getQueryInfo(); for (QueryLogInfo info : QueryLogInfo.values()) { if (queryInfoMap.containsKey(info) && info.logLevel.ordinal() <= event.getConnectionLogLevel().ordinal()) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index cc83d16..6b85e79 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -309,9 +309,11 @@ public interface QueryServices extends SQLCloseable { public static final String WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB = "phoenix.query.wildcard.dynamicColumns"; public static final String LOG_LEVEL = "phoenix.log.level"; + public static final String AUDIT_LOG_LEVEL = "phoenix.audit.log.level"; public static final String LOG_BUFFER_SIZE = "phoenix.log.buffer.size"; public static final String LOG_BUFFER_WAIT_STRATEGY = "phoenix.log.wait.strategy"; public static final String LOG_SAMPLE_RATE = "phoenix.log.sample.rate"; + public static final String LOG_HANDLER_COUNT = "phoenix.log.handler.count"; public static final String SYSTEM_CATALOG_SPLITTABLE = "phoenix.system.catalog.splittable"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 95fb801..58284a7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -362,6 +362,7 @@ public class QueryServicesOptions { public static final boolean DEFAULT_COST_BASED_OPTIMIZER_ENABLED = false; public static final boolean DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB = false; public static final String DEFAULT_LOGGING_LEVEL = LogLevel.OFF.name(); + public static final String DEFAULT_AUDIT_LOGGING_LEVEL = LogLevel.OFF.name(); public static final String DEFAULT_LOG_SAMPLE_RATE = "1.0"; public static final int DEFAULT_LOG_SALT_BUCKETS = 32; public static final int DEFAULT_SALT_BUCKETS = 0;