fsk119 commented on code in PR #20298:
URL: https://github.com/apache/flink/pull/20298#discussion_r930961893


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -18,28 +18,238 @@
 
 package org.apache.flink.table.endpoint.hive.util;
 
-import org.apache.flink.table.gateway.api.HandleIdentifier;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+import org.apache.flink.types.RowKind;
 
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.thrift.Type;
+import org.apache.hive.service.rpc.thrift.TBinaryColumn;
+import org.apache.hive.service.rpc.thrift.TBoolColumn;
+import org.apache.hive.service.rpc.thrift.TBoolValue;
+import org.apache.hive.service.rpc.thrift.TByteColumn;
+import org.apache.hive.service.rpc.thrift.TByteValue;
+import org.apache.hive.service.rpc.thrift.TCLIServiceConstants;
+import org.apache.hive.service.rpc.thrift.TColumn;
+import org.apache.hive.service.rpc.thrift.TColumnDesc;
+import org.apache.hive.service.rpc.thrift.TColumnValue;
+import org.apache.hive.service.rpc.thrift.TDoubleColumn;
+import org.apache.hive.service.rpc.thrift.TDoubleValue;
+import org.apache.hive.service.rpc.thrift.TFetchOrientation;
 import org.apache.hive.service.rpc.thrift.THandleIdentifier;
+import org.apache.hive.service.rpc.thrift.TI16Column;
+import org.apache.hive.service.rpc.thrift.TI16Value;
+import org.apache.hive.service.rpc.thrift.TI32Column;
+import org.apache.hive.service.rpc.thrift.TI32Value;
+import org.apache.hive.service.rpc.thrift.TI64Column;
+import org.apache.hive.service.rpc.thrift.TI64Value;
+import org.apache.hive.service.rpc.thrift.TOperationHandle;
+import org.apache.hive.service.rpc.thrift.TOperationState;
+import org.apache.hive.service.rpc.thrift.TOperationType;
+import org.apache.hive.service.rpc.thrift.TPrimitiveTypeEntry;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.apache.hive.service.rpc.thrift.TRow;
+import org.apache.hive.service.rpc.thrift.TRowSet;
 import org.apache.hive.service.rpc.thrift.TSessionHandle;
 import org.apache.hive.service.rpc.thrift.TStatus;
 import org.apache.hive.service.rpc.thrift.TStatusCode;
+import org.apache.hive.service.rpc.thrift.TStringColumn;
+import org.apache.hive.service.rpc.thrift.TStringValue;
+import org.apache.hive.service.rpc.thrift.TTableSchema;
+import org.apache.hive.service.rpc.thrift.TTypeDesc;
+import org.apache.hive.service.rpc.thrift.TTypeEntry;
+import org.apache.hive.service.rpc.thrift.TTypeQualifierValue;
+import org.apache.hive.service.rpc.thrift.TTypeQualifiers;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HIVE_CLI_SERVICE_PROTOCOL_V6;
 
 /** Conversion between thrift object and flink object. */
 public class ThriftObjectConversions {
 
+    private static final UUID SECRET_ID = 
UUID.fromString("b06fa16a-3d16-475f-b510-6c64abb9b173");
+
+    // 
--------------------------------------------------------------------------------------------
+    // Flink SessionHandle from/to Hive SessionHandle
+    // 
--------------------------------------------------------------------------------------------
+
     public static TSessionHandle toTSessionHandle(SessionHandle sessionHandle) 
{
-        return new 
TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier()));
+        return new 
TSessionHandle(toTHandleIdentifier(sessionHandle.getIdentifier(), SECRET_ID));
     }
 
     public static SessionHandle toSessionHandle(TSessionHandle tSessionHandle) 
{
-        return new 
SessionHandle(toHandleIdentifier(tSessionHandle.getSessionId()));
+        ByteBuffer bb = 
ByteBuffer.wrap(tSessionHandle.getSessionId().getGuid());
+        return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Flink SessionHandle && OperationHandle from/to Hive OperationHandle
+    // 
--------------------------------------------------------------------------------------------
+
+    public static TOperationHandle toTOperationHandle(
+            SessionHandle sessionHandle,
+            OperationHandle operationHandle,
+            OperationType operationType,
+            boolean hasResult) {
+        return new TOperationHandle(
+                toTHandleIdentifier(sessionHandle.getIdentifier(), 
operationHandle.getIdentifier()),
+                toTOperationType(operationType),
+                hasResult);
+    }
+
+    public static SessionHandle toSessionHandle(TOperationHandle 
tOperationHandle) {
+        ByteBuffer bb = 
ByteBuffer.wrap(tOperationHandle.getOperationId().getGuid());
+        return new SessionHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    public static OperationHandle toOperationHandle(TOperationHandle 
tOperationHandle) {
+        ByteBuffer bb = 
ByteBuffer.wrap(tOperationHandle.getOperationId().getSecret());
+        return new OperationHandle(new UUID(bb.getLong(), bb.getLong()));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Operation related conversions
+    // 
--------------------------------------------------------------------------------------------
+
+    public static TOperationType toTOperationType(OperationType type) {
+        switch (type) {
+            case EXECUTE_STATEMENT:
+                return TOperationType.EXECUTE_STATEMENT;
+            case UNKNOWN:
+                return TOperationType.UNKNOWN;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unknown operation type: %s.", type));
+        }
+    }
+
+    public static TOperationState toTOperationState(OperationStatus 
operationStatus) {
+        switch (operationStatus) {
+            case INITIALIZED:
+                return TOperationState.INITIALIZED_STATE;
+            case PENDING:
+                return TOperationState.PENDING_STATE;
+            case RUNNING:
+                return TOperationState.RUNNING_STATE;
+            case FINISHED:
+                return TOperationState.FINISHED_STATE;
+            case ERROR:
+                return TOperationState.ERROR_STATE;
+            case TIMEOUT:
+                return TOperationState.TIMEDOUT_STATE;
+            case CANCELED:
+                return TOperationState.CANCELED_STATE;
+            case CLOSED:
+                return TOperationState.CLOSED_STATE;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unknown operation status: %s.", 
operationStatus));
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Statement related conversions
+    // 
--------------------------------------------------------------------------------------------
+
+    public static FetchOrientation toFetchOrientation(int fetchOrientation) {
+        if (fetchOrientation == TFetchOrientation.FETCH_NEXT.getValue()) {
+            return FetchOrientation.FETCH_NEXT;
+        } else if (fetchOrientation == 
TFetchOrientation.FETCH_PRIOR.getValue()) {
+            return FetchOrientation.FETCH_PRIOR;
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Unsupported fetch orientation: %s.", 
fetchOrientation));
+        }
+    }
+
+    /** Similar logic in the {@code 
org.apache.hive.service.cli.ColumnDescriptor}. */
+    public static TTableSchema toTTableSchema(ResolvedSchema schema) {
+        TTableSchema tSchema = new TTableSchema();
+
+        for (int i = 0; i < schema.getColumnCount(); i++) {
+            Column column = schema.getColumns().get(i);
+            TColumnDesc desc = new TColumnDesc();
+            desc.setColumnName(column.getName());
+            column.getComment().ifPresent(desc::setComment);
+            desc.setPosition(i);
+
+            TTypeDesc typeDesc = new TTypeDesc();
+
+            // Hive uses the TPrimitiveTypeEntry only. Please refer to 
TypeDescriptor#toTTypeDesc.
+            DataType columnType = column.getDataType();
+            TPrimitiveTypeEntry typeEntry =
+                    new TPrimitiveTypeEntry(

Review Comment:
   No. We will transform the complex types to json string in the server side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to