This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f477a43ff23576cd2e1f8c632f78458948245df4 Author: Shengkai <[email protected]> AuthorDate: Sun Jul 24 15:23:46 2022 +0800 [FLINK-28152][sql-gateway][hive] Support GetOperationStatus and GetResultSetMetadata for HiveServer2Endpoint --- .../table/endpoint/hive/HiveServer2Endpoint.java | 51 +++++- .../hive/util/ThriftObjectConversions.java | 186 ++++++++++++++++++++- .../hive/util/ThriftObjectConversionsTest.java | 139 +++++++++++++++ .../flink/table/gateway/api/SqlGatewayService.java | 11 ++ .../table/gateway/api/results/OperationInfo.java | 26 ++- .../gateway/api/utils/MockedSqlGatewayService.java | 8 + .../gateway/service/SqlGatewayServiceImpl.java | 15 ++ .../service/operation/OperationManager.java | 22 ++- .../gateway/service/result/ResultFetcher.java | 4 + .../gateway/service/SqlGatewayServiceITCase.java | 8 +- .../service/SqlGatewayServiceStatementITCase.java | 4 - 11 files changed, 449 insertions(+), 25 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java index 262177a547e..2595860de6b 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java @@ -22,16 +22,21 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.gateway.api.SqlGatewayService; import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.operation.OperationStatus; +import org.apache.flink.table.gateway.api.results.OperationInfo; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import org.apache.flink.table.gateway.api.utils.ThreadUtils; import org.apache.flink.table.module.Module; import org.apache.flink.table.module.hive.HiveModule; +import org.apache.flink.util.ExceptionUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.rpc.thrift.TCLIService; @@ -75,6 +80,7 @@ import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq; import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp; import org.apache.hive.service.rpc.thrift.TOpenSessionReq; import org.apache.hive.service.rpc.thrift.TOpenSessionResp; +import org.apache.hive.service.rpc.thrift.TOperationHandle; import org.apache.hive.service.rpc.thrift.TProtocolVersion; import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq; import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp; @@ -107,9 +113,12 @@ import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIA import static org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V10; import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase; import static org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize; +import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toOperationHandle; import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle; +import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationState; import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle; import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTStatus; +import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTTableSchema; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -218,9 +227,10 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin this.catalogName = checkNotNull(catalogName); this.hiveConfPath = hiveConfPath; this.defaultDatabase = defaultDatabase; - this.allowEmbedded = allowEmbedded; this.moduleName = moduleName; + + this.allowEmbedded = allowEmbedded; } @Override @@ -375,7 +385,27 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin @Override public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq tGetOperationStatusReq) throws TException { - throw new UnsupportedOperationException(ERROR_MESSAGE); + TGetOperationStatusResp resp = new TGetOperationStatusResp(); + try { + TOperationHandle operationHandle = tGetOperationStatusReq.getOperationHandle(); + OperationInfo operationInfo = + service.getOperationInfo( + toSessionHandle(operationHandle), toOperationHandle(operationHandle)); + resp.setStatus(OK_STATUS); + // TODO: support completed time / start time + resp.setOperationState(toTOperationState(operationInfo.getStatus())); + // Currently, all operations have results. + resp.setHasResultSet(true); + if (operationInfo.getStatus().equals(OperationStatus.ERROR) + && operationInfo.getException().isPresent()) { + resp.setErrorMessage( + ExceptionUtils.stringifyException(operationInfo.getException().get())); + } + } catch (Throwable t) { + LOG.error("Failed to GetOperationStatus.", t); + resp.setStatus(toTStatus(t)); + } + return resp; } @Override @@ -393,7 +423,22 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin @Override public TGetResultSetMetadataResp GetResultSetMetadata( TGetResultSetMetadataReq tGetResultSetMetadataReq) throws TException { - throw new UnsupportedOperationException(ERROR_MESSAGE); + TGetResultSetMetadataResp resp = new TGetResultSetMetadataResp(); + try { + SessionHandle sessionHandle = + toSessionHandle(tGetResultSetMetadataReq.getOperationHandle()); + OperationHandle operationHandle = + toOperationHandle(tGetResultSetMetadataReq.getOperationHandle()); + ResolvedSchema schema = + service.getOperationResultSchema(sessionHandle, operationHandle); + + resp.setStatus(OK_STATUS); + resp.setSchema(toTTableSchema(schema)); + } catch (Throwable t) { + LOG.warn("Failed to GetResultSetMetadata.", t); + resp.setStatus(toTStatus(t)); + } + return resp; } @Override diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java index 49ece510191..948771c1401 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java @@ -18,16 +18,42 @@ package org.apache.flink.table.endpoint.hive.util; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; +import org.apache.flink.table.gateway.api.SqlGatewayService; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.operation.OperationStatus; +import org.apache.flink.table.gateway.api.operation.OperationType; import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.hadoop.hive.serde2.thrift.Type; +import org.apache.hive.service.rpc.thrift.TCLIServiceConstants; +import org.apache.hive.service.rpc.thrift.TColumnDesc; import org.apache.hive.service.rpc.thrift.THandleIdentifier; +import org.apache.hive.service.rpc.thrift.TOperationHandle; +import org.apache.hive.service.rpc.thrift.TOperationState; +import org.apache.hive.service.rpc.thrift.TOperationType; +import org.apache.hive.service.rpc.thrift.TPrimitiveTypeEntry; import org.apache.hive.service.rpc.thrift.TSessionHandle; import org.apache.hive.service.rpc.thrift.TStatus; import org.apache.hive.service.rpc.thrift.TStatusCode; +import org.apache.hive.service.rpc.thrift.TTableSchema; +import org.apache.hive.service.rpc.thrift.TTypeDesc; +import org.apache.hive.service.rpc.thrift.TTypeEntry; +import org.apache.hive.service.rpc.thrift.TTypeQualifierValue; +import org.apache.hive.service.rpc.thrift.TTypeQualifiers; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; /** Conversion between thrift object and flink object. */ @@ -40,7 +66,7 @@ public class ThriftObjectConversions { // -------------------------------------------------------------------------------------------- public static TSessionHandle toTSessionHandle(SessionHandle sessionHandle) { - return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier())); + return new TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier(), SECRET_ID)); } public static SessionHandle toSessionHandle(TSessionHandle tSessionHandle) { @@ -48,6 +74,115 @@ public class ThriftObjectConversions { return new SessionHandle(new UUID(bb.getLong(), bb.getLong())); } + // -------------------------------------------------------------------------------------------- + // Flink SessionHandle && OperationHandle from/to Hive OperationHandle + // -------------------------------------------------------------------------------------------- + + /** + * Convert {@link SessionHandle} and {@link OperationHandle} to {@link TOperationHandle}. + * + * <p>Hive uses {@link TOperationHandle} to retrieve the {@code Operation} related information. + * However, SqlGateway uses {@link SessionHandle} and {@link OperationHandle} to determine. + * Therefore, the {@link TOperationHandle} needs to contain both {@link SessionHandle} and + * {@link OperationHandle}. + * + * <p>Currently all operations in the {@link SqlGatewayService} has data. Therefore, set the + * {@code TOperationHandle#hasResultSet} true. + */ + public static TOperationHandle toTOperationHandle( + SessionHandle sessionHandle, + OperationHandle operationHandle, + OperationType operationType) { + return new TOperationHandle( + toTHandleIdentifier(operationHandle.getIdentifier(), sessionHandle.getIdentifier()), + toTOperationType(operationType), + true); + } + + public static SessionHandle toSessionHandle(TOperationHandle tOperationHandle) { + ByteBuffer bb = ByteBuffer.wrap(tOperationHandle.getOperationId().getSecret()); + return new SessionHandle(new UUID(bb.getLong(), bb.getLong())); + } + + public static OperationHandle toOperationHandle(TOperationHandle tOperationHandle) { + ByteBuffer bb = ByteBuffer.wrap(tOperationHandle.getOperationId().getGuid()); + return new OperationHandle(new UUID(bb.getLong(), bb.getLong())); + } + + // -------------------------------------------------------------------------------------------- + // Operation related conversions + // -------------------------------------------------------------------------------------------- + + public static TOperationType toTOperationType(OperationType type) { + switch (type) { + case EXECUTE_STATEMENT: + return TOperationType.EXECUTE_STATEMENT; + case UNKNOWN: + return TOperationType.UNKNOWN; + default: + throw new IllegalArgumentException( + String.format("Unknown operation type: %s.", type)); + } + } + + public static TOperationState toTOperationState(OperationStatus operationStatus) { + switch (operationStatus) { + case INITIALIZED: + return TOperationState.INITIALIZED_STATE; + case PENDING: + return TOperationState.PENDING_STATE; + case RUNNING: + return TOperationState.RUNNING_STATE; + case FINISHED: + return TOperationState.FINISHED_STATE; + case ERROR: + return TOperationState.ERROR_STATE; + case TIMEOUT: + return TOperationState.TIMEDOUT_STATE; + case CANCELED: + return TOperationState.CANCELED_STATE; + case CLOSED: + return TOperationState.CLOSED_STATE; + default: + throw new IllegalArgumentException( + String.format("Unknown operation status: %s.", operationStatus)); + } + } + + // -------------------------------------------------------------------------------------------- + // Statement related conversions + // -------------------------------------------------------------------------------------------- + + /** Similar logic in the {@code org.apache.hive.service.cli.ColumnDescriptor}. */ + public static TTableSchema toTTableSchema(ResolvedSchema schema) { + TTableSchema tSchema = new TTableSchema(); + + for (int i = 0; i < schema.getColumnCount(); i++) { + Column column = schema.getColumns().get(i); + TColumnDesc desc = new TColumnDesc(); + desc.setColumnName(column.getName()); + column.getComment().ifPresent(desc::setComment); + desc.setPosition(i); + + TTypeDesc typeDesc = new TTypeDesc(); + + // Hive uses the TPrimitiveTypeEntry only. Please refer to TypeDescriptor#toTTypeDesc. + DataType columnType = column.getDataType(); + TPrimitiveTypeEntry typeEntry = + new TPrimitiveTypeEntry( + Type.getType(HiveTypeUtil.toHiveTypeInfo(columnType, false)).toTType()); + + if (hasTypeQualifiers(columnType.getLogicalType())) { + typeEntry.setTypeQualifiers(toTTypeQualifiers(columnType.getLogicalType())); + } + typeDesc.addToTypes(TTypeEntry.primitiveEntry(typeEntry)); + + desc.setTypeDesc(typeDesc); + tSchema.addToColumns(desc); + } + return tSchema; + } + public static TStatus toTStatus(Throwable t) { TStatus tStatus = new TStatus(TStatusCode.ERROR_STATUS); tStatus.setErrorMessage(t.getMessage()); @@ -57,7 +192,7 @@ public class ThriftObjectConversions { // -------------------------------------------------------------------------------------------- - private static THandleIdentifier toTHandleIdentifier(UUID publicId) { + private static THandleIdentifier toTHandleIdentifier(UUID publicId, UUID secretId) { byte[] guid = new byte[16]; byte[] secret = new byte[16]; ByteBuffer guidBB = ByteBuffer.wrap(guid); @@ -65,11 +200,54 @@ public class ThriftObjectConversions { guidBB.putLong(publicId.getMostSignificantBits()); guidBB.putLong(publicId.getLeastSignificantBits()); - secretBB.putLong(SECRET_ID.getMostSignificantBits()); - secretBB.putLong(SECRET_ID.getLeastSignificantBits()); + secretBB.putLong(secretId.getMostSignificantBits()); + secretBB.putLong(secretId.getLeastSignificantBits()); return new THandleIdentifier(ByteBuffer.wrap(guid), ByteBuffer.wrap(secret)); } + /** Only the type that has length, precision or scale has {@link TTypeQualifiers}. */ + private static boolean hasTypeQualifiers(LogicalType type) { + switch (type.getTypeRoot()) { + case DECIMAL: + case CHAR: + case VARCHAR: + return true; + default: + return false; + } + } + + /** + * Create {@link TTypeQualifiers} from {@link LogicalType}. The logic is almost same in the + * {@code org.apache.hive.service.cli#toTTypeQualifiers}. + */ + private static TTypeQualifiers toTTypeQualifiers(LogicalType type) { + Map<String, TTypeQualifierValue> qualifiers = new HashMap<>(); + + switch (type.getTypeRoot()) { + case DECIMAL: + qualifiers.put( + TCLIServiceConstants.PRECISION, + TTypeQualifierValue.i32Value(((DecimalType) type).getPrecision())); + qualifiers.put( + TCLIServiceConstants.SCALE, + TTypeQualifierValue.i32Value(((DecimalType) type).getScale())); + break; + case VARCHAR: + qualifiers.put( + TCLIServiceConstants.CHARACTER_MAXIMUM_LENGTH, + TTypeQualifierValue.i32Value(((VarCharType) type).getLength())); + break; + case CHAR: + qualifiers.put( + TCLIServiceConstants.CHARACTER_MAXIMUM_LENGTH, + TTypeQualifierValue.i32Value(((CharType) type).getLength())); + break; + } + + return new TTypeQualifiers(qualifiers); + } + /** * Converts a {@link Throwable} object into a flattened list of texts including its stack trace * and the stack traces of the nested causes. diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java index 511170aa975..aecc8bc2cfa 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversionsTest.java @@ -18,12 +18,55 @@ package org.apache.flink.table.endpoint.hive.util; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.operation.OperationStatus; +import org.apache.flink.table.gateway.api.operation.OperationType; import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.rpc.thrift.TOperationHandle; +import org.apache.hive.service.rpc.thrift.TOperationState; import org.junit.jupiter.api.Test; +import java.sql.Types; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.api.DataTypes.BIGINT; +import static org.apache.flink.table.api.DataTypes.BOOLEAN; +import static org.apache.flink.table.api.DataTypes.BYTES; +import static org.apache.flink.table.api.DataTypes.DATE; +import static org.apache.flink.table.api.DataTypes.DECIMAL; +import static org.apache.flink.table.api.DataTypes.DOUBLE; +import static org.apache.flink.table.api.DataTypes.FLOAT; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.SMALLINT; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.api.DataTypes.TIMESTAMP; +import static org.apache.flink.table.api.DataTypes.TINYINT; +import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toOperationHandle; import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle; +import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle; +import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationState; import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTSessionHandle; +import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTTableSchema; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.CANCELED; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.CLOSED; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.ERROR; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.FINISHED; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.INITIALIZED; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.PENDING; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.RUNNING; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.TIMEOUT; import static org.junit.jupiter.api.Assertions.assertEquals; /** Test for {@link ThriftObjectConversions}. */ @@ -34,4 +77,100 @@ class ThriftObjectConversionsTest { SessionHandle originSessionHandle = SessionHandle.create(); assertEquals(toSessionHandle(toTSessionHandle(originSessionHandle)), originSessionHandle); } + + @Test + public void testConvertSessionHandleAndOperationHandle() { + SessionHandle originSessionHandle = SessionHandle.create(); + OperationHandle originOperationHandle = OperationHandle.create(); + TOperationHandle tOperationHandle = + toTOperationHandle( + originSessionHandle, originOperationHandle, OperationType.UNKNOWN); + + assertEquals(toSessionHandle(tOperationHandle), originSessionHandle); + assertEquals(toOperationHandle(tOperationHandle), originOperationHandle); + } + + @Test + public void testConvertOperationStatus() { + Map<OperationStatus, TOperationState> expectedMappings = new HashMap<>(); + expectedMappings.put(INITIALIZED, TOperationState.INITIALIZED_STATE); + expectedMappings.put(PENDING, TOperationState.PENDING_STATE); + expectedMappings.put(RUNNING, TOperationState.RUNNING_STATE); + expectedMappings.put(FINISHED, TOperationState.FINISHED_STATE); + expectedMappings.put(CANCELED, TOperationState.CANCELED_STATE); + expectedMappings.put(CLOSED, TOperationState.CLOSED_STATE); + expectedMappings.put(ERROR, TOperationState.ERROR_STATE); + expectedMappings.put(TIMEOUT, TOperationState.TIMEDOUT_STATE); + + for (OperationStatus status : expectedMappings.keySet()) { + assertEquals(expectedMappings.get(status), toTOperationState(status)); + } + } + + @Test + public void testToTTableSchema() { + for (DataTypeSpec spec : getDataTypeSpecs()) { + TableSchema actual = + new TableSchema( + toTTableSchema( + DataTypeUtils.expandCompositeTypeToSchema(spec.flinkType))); + List<Integer> javaSqlTypes = + Arrays.stream(actual.toTypeDescriptors()) + .map(desc -> desc.getType().toJavaSQLType()) + .collect(Collectors.toList()); + + assertEquals(Collections.singletonList(spec.sqlType), javaSqlTypes); + } + } + + // -------------------------------------------------------------------------------------------- + + private List<DataTypeSpec> getDataTypeSpecs() { + return Arrays.asList( + DataTypeSpec.newSpec().withType(BOOLEAN()).expectSqlType(Types.BOOLEAN), + DataTypeSpec.newSpec() + .withType(TINYINT()) + // TINYINT is the alias of the BYTE in Hive. + .expectSqlType(Types.BINARY), + DataTypeSpec.newSpec().withType(SMALLINT()).expectSqlType(Types.SMALLINT), + DataTypeSpec.newSpec().withType(INT()).expectSqlType(Types.INTEGER), + DataTypeSpec.newSpec().withType(BIGINT()).expectSqlType(Types.BIGINT), + DataTypeSpec.newSpec().withType(FLOAT()).expectSqlType(Types.FLOAT), + DataTypeSpec.newSpec().withType(DOUBLE()).expectSqlType(Types.DOUBLE), + DataTypeSpec.newSpec().withType(DECIMAL(9, 6)).expectSqlType(Types.DECIMAL), + DataTypeSpec.newSpec().withType(STRING()).expectSqlType(Types.VARCHAR), + DataTypeSpec.newSpec().withType(BYTES()).expectSqlType(Types.BINARY), + DataTypeSpec.newSpec().withType(DATE()).expectSqlType(Types.DATE), + DataTypeSpec.newSpec().withType(TIMESTAMP(4)).expectSqlType(Types.TIMESTAMP), + DataTypeSpec.newSpec() + .withType(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())) + .expectSqlType(Types.JAVA_OBJECT), + DataTypeSpec.newSpec() + .withType( + DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()))) + // Hive uses STRING type + .expectSqlType(Types.VARCHAR)); + } + + private static class DataTypeSpec { + DataType flinkType; + Integer sqlType; + RowData flinkValue; + + public static DataTypeSpec newSpec() { + DataTypeSpec spec = new DataTypeSpec(); + spec.flinkValue = new GenericRowData(1); + return spec; + } + + public DataTypeSpec withType(DataType flinkType) { + this.flinkType = DataTypes.ROW(flinkType); + return this; + } + + public DataTypeSpec expectSqlType(int sqlType) { + this.sqlType = sqlType; + return this; + } + } } diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java index 7e92610fb6e..70e326705c6 100644 --- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java @@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.api; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.operation.OperationType; import org.apache.flink.table.gateway.api.results.OperationInfo; @@ -108,6 +109,16 @@ public interface SqlGatewayService { OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) throws SqlGatewayException; + /** + * Get the result schema for the specified Operation. + * + * @param sessionHandle handle to identify the session. + * @param operationHandle handle to identify the operation. + */ + ResolvedSchema getOperationResultSchema( + SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException; + // ------------------------------------------------------------------------------------------- // Statements // ------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java index 7ebb28f5169..3abe6bdb7ce 100644 --- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java @@ -22,24 +22,28 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.gateway.api.operation.OperationStatus; import org.apache.flink.table.gateway.api.operation.OperationType; +import javax.annotation.Nullable; + import java.util.Objects; +import java.util.Optional; /** Information of the {@code Operation}. */ @PublicEvolving public class OperationInfo { private final OperationStatus status; - private final boolean hasResults; private final OperationType type; + @Nullable private final Exception exception; - public OperationInfo(OperationStatus status, OperationType type, boolean hasResults) { - this.status = status; - this.type = type; - this.hasResults = hasResults; + public OperationInfo(OperationStatus status, OperationType type) { + this(status, type, null); } - public boolean isHasResults() { - return hasResults; + public OperationInfo( + OperationStatus status, OperationType type, @Nullable Exception exception) { + this.status = status; + this.type = type; + this.exception = exception; } public OperationType getType() { @@ -50,6 +54,10 @@ public class OperationInfo { return status; } + public Optional<Exception> getException() { + return Optional.ofNullable(exception); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -59,11 +67,11 @@ public class OperationInfo { return false; } OperationInfo that = (OperationInfo) o; - return hasResults == that.hasResults && status == that.status && type == that.type; + return status == that.status && type == that.type && exception == that.exception; } @Override public int hashCode() { - return Objects.hash(status, hasResults, type); + return Objects.hash(status, type, exception); } } diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java index f962e9c727e..49ba033a314 100644 --- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java +++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java @@ -19,6 +19,7 @@ package org.apache.flink.table.gateway.api.utils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.gateway.api.SqlGatewayService; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.operation.OperationType; @@ -89,4 +90,11 @@ public class MockedSqlGatewayService implements SqlGatewayService { throws SqlGatewayException { throw new UnsupportedOperationException(); } + + @Override + public ResolvedSchema getOperationResultSchema( + SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException { + throw new UnsupportedOperationException(); + } } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java index fb68828aedf..7ec185dd8b0 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java @@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.gateway.api.SqlGatewayService; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.operation.OperationType; @@ -124,6 +125,20 @@ public class SqlGatewayServiceImpl implements SqlGatewayService { } } + @Override + public ResolvedSchema getOperationResultSchema( + SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException { + try { + return getSession(sessionHandle) + .getOperationManager() + .getOperationResultSchema(operationHandle); + } catch (Throwable t) { + LOG.error("Failed to getOperationResultSchema.", t); + throw new SqlGatewayException("Failed to getOperationResultSchema.", t); + } + } + @Override public OperationHandle executeStatement( SessionHandle sessionHandle, diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java index 80ec381eb96..602a3fc3552 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java @@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.operation; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.operation.OperationStatus; import org.apache.flink.table.gateway.api.operation.OperationType; @@ -150,6 +151,15 @@ public class OperationManager { return getOperation(operationHandle).getOperationInfo(); } + /** + * Get the {@link ResolvedSchema} of the operation. + * + * @param operationHandle identifies the {@link Operation}. + */ + public ResolvedSchema getOperationResultSchema(OperationHandle operationHandle) { + return getOperation(operationHandle).getResultSchema(); + } + /** * Get the results of the operation. * @@ -309,7 +319,17 @@ public class OperationManager { } public OperationInfo getOperationInfo() { - return new OperationInfo(status.get(), operationType, hasResults); + return new OperationInfo(status.get(), operationType); + } + + public ResolvedSchema getResultSchema() { + OperationStatus current = status.get(); + if (current != OperationStatus.FINISHED || !hasResults) { + throw new IllegalStateException( + "The result schema is available when the Operation is in FINISHED state and the Operation has data."); + } + + return resultFetcher.getResultSchema(); } private void updateState(OperationStatus toStatus) { diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java index 6812947ead9..98da1c9ac2a 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java @@ -89,6 +89,10 @@ public class ResultFetcher { resultStore.close(); } + public ResolvedSchema getResultSchema() { + return resultSchema; + } + /** * Fetch results from the result store. It tries to return the data cached in the buffer first. * If the buffer is empty, then fetch results from the {@link ResultStore}. It's possible diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java index 0d0069f388e..33b6d5e86d1 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java @@ -193,12 +193,12 @@ public class SqlGatewayServiceITCase extends AbstractTestBase { startRunningLatch.await(); assertEquals( - new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN, true), + new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN), service.getOperationInfo(sessionHandle, operationHandle)); endRunningLatch.countDown(); OperationInfo expectedInfo = - new OperationInfo(OperationStatus.FINISHED, OperationType.UNKNOWN, true); + new OperationInfo(OperationStatus.FINISHED, OperationType.UNKNOWN); CommonTestUtils.waitUtil( () -> service.getOperationInfo(sessionHandle, operationHandle).equals(expectedInfo), @@ -237,13 +237,13 @@ public class SqlGatewayServiceITCase extends AbstractTestBase { startRunningLatch.await(); assertEquals( - new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN, true), + new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN), service.getOperationInfo(sessionHandle, operationHandle)); service.cancelOperation(sessionHandle, operationHandle); assertEquals( - new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN, true), + new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN), service.getOperationInfo(sessionHandle, operationHandle)); service.closeOperation(sessionHandle, operationHandle); assertEquals(0, sessionManager.getOperationCount(sessionHandle)); diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java index 40a3ec44f34..6348ca0d575 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceStatementITCase.java @@ -231,10 +231,6 @@ public class SqlGatewayServiceStatementITCase { Duration.ofSeconds(100), "Failed to wait operation finish."); - if (!service.getOperationInfo(sessionHandle, operationHandle).isHasResults()) { - return Tag.INFO.addTag(""); - } - // The content in the result of the `explain` and `show create` statement is large, so it's // more straightforward to just print the content without the table. if (statement.toUpperCase().startsWith("EXPLAIN")
