fsk119 commented on code in PR #21502: URL: https://github.com/apache/flink/pull/21502#discussion_r1064355032
########## flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.results; Review Comment: I don't think we should expose the implementation in the sql-gateway-api package. Move to the `org.apache.flink.table.gateway.service.result` ########## flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.results; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.table.api.ResultKind; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.utils.print.RowDataToStringConverter; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** An implementation of {@link ResultSet}. */ +@Internal +public class ResultSetImpl implements ResultSet { + + private final ResultType resultType; + + @Nullable private final Long nextToken; + + private final ResolvedSchema resultSchema; + private final List<RowData> data; + @Nullable private final RowDataToStringConverter converter; Review Comment: Why this Nulllable? I think it's not null in the EOS/PAYLOAD case. ########## flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.results; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.JobID; +import org.apache.flink.table.api.ResultKind; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.utils.print.RowDataToStringConverter; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** An implementation of {@link ResultSet}. */ +@Internal +public class ResultSetImpl implements ResultSet { + + private final ResultType resultType; + + @Nullable private final Long nextToken; + + private final ResolvedSchema resultSchema; + private final List<RowData> data; + @Nullable private final RowDataToStringConverter converter; + + private final boolean isQueryResult; + + @Nullable private final JobID jobID; + + @Nullable private final ResultKind resultKind; + + public static final ResultSet NOT_READY_RESULTS = Review Comment: It's a little strange that we can get jobId or ResultKind from the `NOT_READY_RESULTS`. I think we should introduce an individual class to represent this and throw exception if others try to get the results. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java: ########## @@ -386,30 +414,51 @@ private ResultFetcher callModifyOperations( OperationHandle handle, List<ModifyOperation> modifyOperations) { TableResultInternal result = tableEnv.executeInternal(modifyOperations); - return new ResultFetcher( - handle, - ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())), - Collections.singletonList( - GenericRowData.of( - StringData.fromString( - result.getJobClient() - .orElseThrow( - () -> - new SqlExecutionException( - String.format( - "Can't get job client for the operation %s.", - handle))) - .getJobID() - .toString())))); + return ResultFetcher.newBuilder() + .operationHandle(handle) + .resolvedSchema(ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING()))) + .rows( + Collections.singletonList( + GenericRowData.of( + StringData.fromString( + result.getJobClient() + .orElseThrow( + () -> + new SqlExecutionException( + String.format( + "Can't get job client for the operation %s.", + handle))) + .getJobID() + .toString())))) + .converter(SIMPLE_ROW_DATA_TO_STRING_CONVERTER) + .jobID( + result.getJobClient() + .orElseThrow( + () -> + new SqlExecutionException( + String.format( + "Can't get job client for the operation %s.", + handle))) + .getJobID()) + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .build(); } private ResultFetcher callOperation( TableEnvironmentInternal tableEnv, OperationHandle handle, Operation op) { TableResultInternal result = tableEnv.executeInternal(op); - return new ResultFetcher( - handle, - result.getResolvedSchema(), - CollectionUtil.iteratorToList(result.collectInternal())); + JobID jobID = null; + if (result.getJobClient().isPresent()) { + jobID = result.getJobClient().get().getJobID(); + } Review Comment: Is it possible to satisfy the condition? It seems we have different paths to process ModifyOperation and QueryOperation ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java: ########## @@ -456,6 +450,100 @@ public void testGetOperationSchemaUntilOperationIsReady() throws Exception { task -> assertThat(task.get()).isEqualTo(getDefaultResultSet().getResultSchema())); } + // --------------------------------------------------------------------------------------------- + // Executing statement tests + // --------------------------------------------------------------------------------------------- Review Comment: I am prone to move the tests to the `SqlGatewayServiceStatementITCase` side. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java: ########## @@ -220,4 +223,111 @@ public synchronized ResultSet fetchResults(long token, int maxFetchSize) { public ResultStore getResultStore() { return resultStore; } + + private ResultSet buildEosResultSet() { Review Comment: nit: buildEosResultSet -> buildEOSResultSet ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java: ########## @@ -220,4 +223,111 @@ public synchronized ResultSet fetchResults(long token, int maxFetchSize) { public ResultStore getResultStore() { return resultStore; } + + private ResultSet buildEosResultSet() { + return ResultSetImpl.newBuilder() + .resultType(ResultSet.ResultType.EOS) + .nextToken(null) + .resolvedSchema(resultSchema) + .data(Collections.emptyList()) + .build(); + } + + private ResultSet buildPayloadResultSet() { + return ResultSetImpl.newBuilder() + .resultType(ResultSet.ResultType.PAYLOAD) + .nextToken(currentToken) + .resolvedSchema(resultSchema) + .data(new ArrayList<>(bufferedPrevResults)) + .converter(converter) + .isQueryResult(isQueryResult) + .jobID(jobID) + .resultKind(resultKind) + .build(); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** Builder to build the {@link ResultFetcher}. */ + public static class Builder { + private OperationHandle operationHandle; + private ResolvedSchema resultSchema; + private List<RowData> rows; + private CloseableIterator<RowData> rowsIterator; + RowDataToStringConverter converter; + private boolean isQueryResult = false; + @Nullable private JobID jobID; + private ResultKind resultKind; + + public Builder operationHandle(OperationHandle operationHandle) { + this.operationHandle = operationHandle; + return this; + } + + public Builder resolvedSchema(ResolvedSchema resultSchema) { + this.resultSchema = resultSchema; + return this; + } + + public Builder rows(List<RowData> rows) { + Preconditions.checkState( + rowsIterator == null, + "Result data has been set already. Can only set either rows or rowsIterator"); + this.rows = rows; + return this; + } + + public Builder rowsIterator(CloseableIterator<RowData> rowsIterator) { + Preconditions.checkState( + rows == null, + "Result data has been set already. Can only set either rows or rowsIterator"); + this.rowsIterator = rowsIterator; + return this; + } + + public Builder converter(RowDataToStringConverter converter) { + this.converter = converter; + return this; + } + + public Builder setIsQueryResult() { Review Comment: setIsQueryResult -> queryResult ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java: ########## @@ -54,37 +61,43 @@ public class ResultFetcher { private final OperationHandle operationHandle; private final ResolvedSchema resultSchema; - private final ResultStore resultStore; + private ResultStore resultStore; Review Comment: Why this is not final? ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java: ########## @@ -386,30 +414,51 @@ private ResultFetcher callModifyOperations( OperationHandle handle, List<ModifyOperation> modifyOperations) { TableResultInternal result = tableEnv.executeInternal(modifyOperations); - return new ResultFetcher( - handle, - ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())), - Collections.singletonList( - GenericRowData.of( - StringData.fromString( - result.getJobClient() - .orElseThrow( - () -> - new SqlExecutionException( - String.format( - "Can't get job client for the operation %s.", - handle))) - .getJobID() - .toString())))); + return ResultFetcher.newBuilder() + .operationHandle(handle) + .resolvedSchema(ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING()))) + .rows( + Collections.singletonList( + GenericRowData.of( + StringData.fromString( + result.getJobClient() + .orElseThrow( + () -> + new SqlExecutionException( + String.format( + "Can't get job client for the operation %s.", + handle))) + .getJobID() + .toString())))) + .converter(SIMPLE_ROW_DATA_TO_STRING_CONVERTER) + .jobID( + result.getJobClient() + .orElseThrow( + () -> + new SqlExecutionException( + String.format( + "Can't get job client for the operation %s.", + handle))) + .getJobID()) + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .build(); } private ResultFetcher callOperation( TableEnvironmentInternal tableEnv, OperationHandle handle, Operation op) { TableResultInternal result = tableEnv.executeInternal(op); - return new ResultFetcher( - handle, - result.getResolvedSchema(), - CollectionUtil.iteratorToList(result.collectInternal())); + JobID jobID = null; + if (result.getJobClient().isPresent()) { + jobID = result.getJobClient().get().getJobID(); + } + return ResultFetcher.newBuilder() + .operationHandle(handle) Review Comment: I think most codes are similar here? What about we introducing a method named `fromTableResult`? ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java: ########## @@ -386,30 +414,51 @@ private ResultFetcher callModifyOperations( OperationHandle handle, List<ModifyOperation> modifyOperations) { TableResultInternal result = tableEnv.executeInternal(modifyOperations); - return new ResultFetcher( - handle, - ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING())), - Collections.singletonList( - GenericRowData.of( - StringData.fromString( - result.getJobClient() - .orElseThrow( - () -> - new SqlExecutionException( - String.format( - "Can't get job client for the operation %s.", - handle))) - .getJobID() - .toString())))); + return ResultFetcher.newBuilder() + .operationHandle(handle) + .resolvedSchema(ResolvedSchema.of(Column.physical(JOB_ID, DataTypes.STRING()))) + .rows( + Collections.singletonList( + GenericRowData.of( + StringData.fromString( + result.getJobClient() + .orElseThrow( + () -> + new SqlExecutionException( + String.format( + "Can't get job client for the operation %s.", + handle))) + .getJobID() + .toString())))) + .converter(SIMPLE_ROW_DATA_TO_STRING_CONVERTER) Review Comment: I think `SIMPLE_ROW_DATA_TO_STRING_CONVERTER` can be the default converter. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java: ########## @@ -220,4 +223,111 @@ public synchronized ResultSet fetchResults(long token, int maxFetchSize) { public ResultStore getResultStore() { return resultStore; } + + private ResultSet buildEosResultSet() { + return ResultSetImpl.newBuilder() + .resultType(ResultSet.ResultType.EOS) + .nextToken(null) + .resolvedSchema(resultSchema) + .data(Collections.emptyList()) + .build(); + } + + private ResultSet buildPayloadResultSet() { + return ResultSetImpl.newBuilder() + .resultType(ResultSet.ResultType.PAYLOAD) + .nextToken(currentToken) + .resolvedSchema(resultSchema) + .data(new ArrayList<>(bufferedPrevResults)) + .converter(converter) + .isQueryResult(isQueryResult) + .jobID(jobID) + .resultKind(resultKind) + .build(); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** Builder to build the {@link ResultFetcher}. */ + public static class Builder { + private OperationHandle operationHandle; + private ResolvedSchema resultSchema; + private List<RowData> rows; + private CloseableIterator<RowData> rowsIterator; + RowDataToStringConverter converter; Review Comment: Why this is not private? ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java: ########## @@ -220,4 +223,111 @@ public synchronized ResultSet fetchResults(long token, int maxFetchSize) { public ResultStore getResultStore() { return resultStore; } + + private ResultSet buildEosResultSet() { + return ResultSetImpl.newBuilder() + .resultType(ResultSet.ResultType.EOS) + .nextToken(null) + .resolvedSchema(resultSchema) + .data(Collections.emptyList()) + .build(); + } + + private ResultSet buildPayloadResultSet() { + return ResultSetImpl.newBuilder() + .resultType(ResultSet.ResultType.PAYLOAD) + .nextToken(currentToken) + .resolvedSchema(resultSchema) + .data(new ArrayList<>(bufferedPrevResults)) + .converter(converter) + .isQueryResult(isQueryResult) + .jobID(jobID) + .resultKind(resultKind) + .build(); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** Builder to build the {@link ResultFetcher}. */ + public static class Builder { + private OperationHandle operationHandle; + private ResolvedSchema resultSchema; + private List<RowData> rows; + private CloseableIterator<RowData> rowsIterator; + RowDataToStringConverter converter; + private boolean isQueryResult = false; + @Nullable private JobID jobID; + private ResultKind resultKind; + + public Builder operationHandle(OperationHandle operationHandle) { + this.operationHandle = operationHandle; + return this; + } + + public Builder resolvedSchema(ResolvedSchema resultSchema) { + this.resultSchema = resultSchema; + return this; + } + + public Builder rows(List<RowData> rows) { + Preconditions.checkState( + rowsIterator == null, + "Result data has been set already. Can only set either rows or rowsIterator"); + this.rows = rows; + return this; + } + + public Builder rowsIterator(CloseableIterator<RowData> rowsIterator) { + Preconditions.checkState( + rows == null, + "Result data has been set already. Can only set either rows or rowsIterator"); + this.rowsIterator = rowsIterator; + return this; + } + + public Builder converter(RowDataToStringConverter converter) { + this.converter = converter; + return this; + } + + public Builder setIsQueryResult() { + this.isQueryResult = true; + return this; + } + + public Builder jobID(JobID jobID) { + this.jobID = jobID; + return this; + } + + public Builder resultKind(ResultKind resultKind) { + this.resultKind = resultKind; + return this; + } + + public ResultFetcher build() { + Preconditions.checkArgument( + rows != null || rowsIterator != null, "Result data has not been set."); + + ResultFetcher resultFetcher = + new ResultFetcher( + operationHandle, + resultSchema, + converter, + isQueryResult, + jobID, + resultKind); + + if (rows != null) { + resultFetcher.resultStore = ResultStore.DUMMY_RESULT_STORE; + resultFetcher.bufferedResults.addAll(rows); + } else { + resultFetcher.resultStore = + new ResultStore(rowsIterator, TABLE_RESULT_MAX_INITIAL_CAPACITY); + } Review Comment: Why not introducing ``` public static ResultFetcher fromTableResult( OperationHandle operationHandle, TableResult tableResult, boolean isQuery) {} public static ResultFetcher fromResults( OperationHandle operationHandle, ResolvedSchema resolvedSchema, List<RowData> results) {} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
