walterddr commented on code in PR #9765:
URL: https://github.com/apache/pinot/pull/9765#discussion_r1018496960
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java:
##########
@@ -70,6 +60,128 @@ public class QueryRunnerTestBase extends QueryTestSet {
protected Map<ServerInstance, QueryServerEnclosure> _servers = new
HashMap<>();
protected GrpcMailboxService _mailboxService;
+ // --------------------------------------------------------------------------
+ // QUERY UTILS
+ // --------------------------------------------------------------------------
+ protected List<Object[]> queryRunner(String sql) {
+ QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
+ Map<String, String> requestMetadataMap =
+ ImmutableMap.of("REQUEST_ID",
String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
+ MailboxReceiveOperator mailboxReceiveOperator = null;
+ for (int stageId : queryPlan.getStageMetadataMap().keySet()) {
+ if (queryPlan.getQueryStageMap().get(stageId) instanceof
MailboxReceiveNode) {
+ MailboxReceiveNode reduceNode = (MailboxReceiveNode)
queryPlan.getQueryStageMap().get(stageId);
+ mailboxReceiveOperator =
QueryDispatcher.createReduceStageOperator(_mailboxService,
+
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
+ Long.parseLong(requestMetadataMap.get("REQUEST_ID")),
reduceNode.getSenderStageId(),
+ reduceNode.getDataSchema(), "localhost", _reducerGrpcPort);
+ } else {
+ for (ServerInstance serverInstance :
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
+ DistributedStagePlan distributedStagePlan =
+ QueryDispatcher.constructDistributedStagePlan(queryPlan,
stageId, serverInstance);
+ _servers.get(serverInstance).processQuery(distributedStagePlan,
requestMetadataMap);
+ }
+ }
+ }
+ Preconditions.checkNotNull(mailboxReceiveOperator);
+ return
QueryDispatcher.toResultTable(QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator),
+ queryPlan.getQueryResultFields(),
queryPlan.getQueryStageMap().get(0).getDataSchema()).getRows();
+ }
+
+ protected List<Object[]> queryH2(String sql)
+ throws Exception {
+ Statement h2statement =
_h2Connection.createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+ h2statement.execute(sql);
+ ResultSet h2ResultSet = h2statement.getResultSet();
+ int columnCount = h2ResultSet.getMetaData().getColumnCount();
+ List<Object[]> result = new ArrayList<>();
+ while (h2ResultSet.next()) {
+ Object[] row = new Object[columnCount];
+ for (int i = 0; i < columnCount; i++) {
+ row[i] = h2ResultSet.getObject(i + 1);
+ }
+ result.add(row);
+ }
+ return result;
+ }
+
+ protected void compareRowEquals(List<Object[]> resultRows, List<Object[]>
expectedRows) {
+ Assert.assertEquals(resultRows.size(), expectedRows.size());
+
+ Comparator<Object> valueComp = (l, r) -> {
+ if (l == null && r == null) {
+ return 0;
+ } else if (l == null) {
+ return -1;
+ } else if (r == null) {
+ return 1;
+ }
+ if (l instanceof Integer) {
+ return Integer.compare((Integer) l, ((Number) r).intValue());
+ } else if (l instanceof Long) {
+ return Long.compare((Long) l, ((Number) r).longValue());
+ } else if (l instanceof Float) {
+ return Float.compare((Float) l, ((Number) r).floatValue());
+ } else if (l instanceof Double) {
+ return Double.compare((Double) l, ((Number) r).doubleValue());
+ } else if (l instanceof String) {
+ return ((String) l).compareTo((String) r);
+ } else if (l instanceof Boolean) {
+ return ((Boolean) l).compareTo((Boolean) r);
+ } else {
+ throw new RuntimeException("non supported type " + l.getClass());
+ }
+ };
+ Comparator<Object[]> rowComp = (l, r) -> {
+ int cmp = 0;
+ for (int i = 0; i < l.length; i++) {
+ cmp = valueComp.compare(l[i], r[i]);
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ return 0;
+ };
+ resultRows.sort(rowComp);
+ expectedRows.sort(rowComp);
+ for (int i = 0; i < resultRows.size(); i++) {
+ Object[] resultRow = resultRows.get(i);
+ Object[] expectedRow = expectedRows.get(i);
+ for (int j = 0; j < resultRow.length; j++) {
+ Assert.assertEquals(valueComp.compare(resultRow[j], expectedRow[j]), 0,
+ "Not match at (" + i + "," + j + ")! Expected: " + expectedRow[j]
+ " Actual: " + resultRow[j]);
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------
+ // TEST CASES PREP
+ // --------------------------------------------------------------------------
+ protected Schema constructSchema(String schemaName, Map<String, String>
dataTypes) {
+ Schema.SchemaBuilder builder = new Schema.SchemaBuilder();
+ for (Map.Entry<String, String> dataType : dataTypes.entrySet()) {
+ builder.addSingleValueDimension(dataType.getKey(),
FieldSpec.DataType.valueOf(dataType.getValue()));
+ }
+ // ts is built in
+ builder.addDateTime("ts", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH",
"1:HOURS");
+ builder.setSchemaName(schemaName);
+ return builder.build();
+ }
+
+ protected List<GenericRow> toRow(List<List<Object>> value) {
+ List<GenericRow> result = new ArrayList<>(value.size());
+ for (int rowId = 0; rowId < value.size(); rowId++) {
+ GenericRow row = new GenericRow();
+ List<Object> rawRow = value.get(rowId);
+ for (int colId = 0; colId < rawRow.size(); colId++) {
+ row.putValue("col" + colId, rawRow.get(colId));
+ }
+ row.putValue("ts", System.currentTimeMillis());
Review Comment:
let me remove ts entirely. this is b/c we don't have REALTIME/OFFLINE split
for now. and we don't want to mix this one in. let me remove
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]