twalthr commented on a change in pull request #17739:
URL: https://github.com/apache/flink/pull/17739#discussion_r748133658



##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableResultView.java
##########
@@ -309,12 +305,7 @@ private void updatePage() {
         // convert page
         final List<String[]> stringRows =
                 rows.stream()
-                        .map(
-                                r ->
-                                        PrintUtils.rowToString(
-                                                r,
-                                                
resultDescriptor.getResultSchema(),
-                                                sessionTimeZone))
+                        
.map(resultDescriptor.getRowDataStringConverter()::toString)

Review comment:
       calling this method `toString` could be confusing. how about 
`convertToColumnStrings` or so?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.utils.CastExecutor;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.table.utils.print.PrintStyle;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import static org.apache.flink.table.api.DataTypes.STRING;
+
+/** {@link RowData} to {@link String} converter using {@link CastRule}. */
+public final class RowDataToStringConverterImpl implements 
RowDataToStringConverter {
+
+    private final Function<RowData, String>[] converters;

Review comment:
       nit: call it `columnConverters`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
##########
@@ -88,7 +89,8 @@ public static RelNode convertCollectToRel(
             RelNode input,
             CollectModifyOperation collectModifyOperation,
             Configuration configuration,
-            ClassLoader classLoader) {
+            ClassLoader classLoader,
+            ZoneId zoneId) {

Review comment:
       get the ZoneId from configuration? we should reduce the number of 
arguments to methods esp. if it is encoded in the other arg. Also, we should 
not pass `TableConfig` anymore as this is a pure API class. `ReadableConfig` 
should be passed around.

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
##########
@@ -126,34 +122,29 @@ private void checkAndCleanUpQuery(boolean cleanUpQuery) {
     private void printBatchResults(AtomicInteger receivedRowCount) {
         final List<RowData> resultRows = waitBatchResults();
         receivedRowCount.addAndGet(resultRows.size());
-        PrintUtils.printAsTableauForm(
-                resultDescriptor.getResultSchema(),
-                resultRows.iterator(),
-                terminal.writer(),
-                sessionTimeZone);
+        TableauStyle style =
+                PrintStyle.tableauWithDataInferredColumnWidths(
+                        resultDescriptor.getResultSchema(),
+                        resultDescriptor.getRowDataStringConverter(),
+                        PrintStyle.DEFAULT_MAX_COLUMN_WIDTH,
+                        false,
+                        false);
+        style.print(resultRows.iterator(), terminal.writer());
     }
 
     private void printStreamingResults(AtomicInteger receivedRowCount) {
-        List<Column> columns = resultDescriptor.getResultSchema().getColumns();
-        final String[] fieldNames =
-                Stream.concat(
-                                Stream.of(PrintUtils.ROW_KIND_COLUMN),
-                                columns.stream().map(Column::getName))
-                        .toArray(String[]::new);
-
-        final int[] colWidths =
-                PrintUtils.columnWidthsByType(
-                        columns,
+        TableauStyle style =
+                PrintStyle.tableauWithTypeInferredColumnWidths(
+                        resultDescriptor.getResultSchema(),
+                        resultDescriptor.getRowDataStringConverter(),

Review comment:
       Sorry for starting this discussion again but: Why do we actually need to 
pass the `RowDataStringConverter` to the CLI? Couldn't 
`Executor#retrieveResultPage` and `Executor#retrieveResultChanges` return 
`String[]`? If we would break up the architecture into client/server, this 
would simplify the serialization because we neither need to serialize `RowData` 
nor `RowDataStringConverter`.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/RowDataToStringConverterImpl.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.utils.CastExecutor;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.table.utils.print.PrintStyle;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import static org.apache.flink.table.api.DataTypes.STRING;
+
+/** {@link RowData} to {@link String} converter using {@link CastRule}. */
+public final class RowDataToStringConverterImpl implements 
RowDataToStringConverter {
+
+    private final Function<RowData, String>[] converters;
+
+    @VisibleForTesting
+    public RowDataToStringConverterImpl(DataType dataType) {
+        this(
+                dataType,
+                DateTimeUtils.UTC_ZONE.toZoneId(),
+                Thread.currentThread().getContextClassLoader());
+    }
+
+    @SuppressWarnings("unchecked")
+    public RowDataToStringConverterImpl(DataType dataType, ZoneId zoneId, 
ClassLoader classLoader) {
+        List<DataType> rowDataTypes = DataType.getFieldDataTypes(dataType);
+        this.converters = new Function[rowDataTypes.size()];
+
+        for (int i = 0; i < rowDataTypes.size(); i++) {
+            final int index = i;
+            LogicalType fieldType = rowDataTypes.get(index).getLogicalType();
+            RowData.FieldGetter getter = RowData.createFieldGetter(fieldType, 
index);
+            CastExecutor<Object, StringData> castExecutor =
+                    (CastExecutor<Object, StringData>)
+                            CastRuleProvider.create(
+                                    CastRule.Context.create(zoneId, 
classLoader),
+                                    fieldType,
+                                    STRING().getLogicalType());
+            if (castExecutor == null) {
+                // Fallback in case no casting rule is defined
+                this.converters[index] =
+                        row -> {
+                            if (row.isNullAt(index)) {
+                                return PrintStyle.NULL_VALUE;
+                            }
+                            return 
Objects.toString(getter.getFieldOrNull(row));

Review comment:
       isn't this very unlikely? shouldn't we rather throw an exception because 
it would be a bug.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/print/TableauStyle.java
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils.print;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import com.ibm.icu.lang.UCharacter;
+import com.ibm.icu.lang.UProperty;
+
+import javax.annotation.Nullable;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * Print the result and content as tableau form.
+ *
+ * <p>For example: (printRowKind is true)
+ *
+ * <pre>
+ * +----+-------------+---------+-------------+
+ * | op | boolean_col | int_col | varchar_col |
+ * +----+-------------+---------+-------------+
+ * | +I |        true |       1 |         abc |
+ * | -U |       false |       2 |         def |
+ * | +U |       false |       3 |         def |
+ * | -D |      &lt;NULL&gt; |  &lt;NULL&gt; |      &lt;NULL&gt; |
+ * +----+-------------+---------+-------------+
+ * 4 rows in set
+ * </pre>
+ */
+@Internal
+public final class TableauStyle implements PrintStyle {
+
+    // constants for printing
+    private static final String ROW_KIND_COLUMN = "op";
+    private static final String COLUMN_TRUNCATED_FLAG = "...";
+
+    private final RowDataToStringConverter converter;
+
+    /** The max width of a column. */
+    private final int maxColumnWidth;
+
+    /**
+     * A flag to indicate whether null should be printed as empty string more 
than {@code <NULL>}.
+     */
+    private final boolean printNullAsEmpty;
+
+    /** A flag to indicate whether print row kind info. */
+    private final boolean printRowKind;
+
+    private int[] columnWidths;
+
+    private String[] columnNames;
+
+    TableauStyle(
+            ResolvedSchema resolvedSchema,
+            RowDataToStringConverter converter,
+            int[] columnWidths,
+            int maxColumnWidth,
+            boolean printNullAsEmpty,
+            boolean printRowKind) {
+        this.converter = converter;
+        this.columnWidths = columnWidths;
+        this.maxColumnWidth = maxColumnWidth;
+        this.printNullAsEmpty = printNullAsEmpty;
+        this.printRowKind = printRowKind;
+
+        if (printRowKind) {
+            this.columnNames =
+                    Stream.concat(
+                                    Stream.of(ROW_KIND_COLUMN),
+                                    resolvedSchema.getColumnNames().stream())
+                            .toArray(String[]::new);
+        } else {
+            this.columnNames = resolvedSchema.getColumnNames().toArray(new 
String[0]);
+        }
+    }
+
+    /** Returns null if the column widths are not precomputed using the row 
type. */
+    public @Nullable int[] getColumnWidths() {
+        return columnWidths;
+    }
+
+    @Override
+    public void print(Iterator<RowData> it, PrintWriter printWriter) {
+        if (!it.hasNext()) {
+            printWriter.println("Empty set");
+            printWriter.flush();
+            return;
+        }
+
+        if (columnWidths == null) {
+            final List<RowData> rows = new ArrayList<>();
+            final List<String[]> content = new ArrayList<>();
+            content.add(columnNames);
+            while (it.hasNext()) {
+                RowData row = it.next();
+                rows.add(row);
+                content.add(rowFieldsToString(row));
+            }
+            this.columnWidths = columnWidthsByContent(columnNames, content, 
maxColumnWidth);
+            it = rows.iterator();
+        }
+
+        // print border line
+        printBorderLine(printWriter);
+        // print field names
+        printColumnNamesTableauRow(printWriter);
+        // print border line
+        printBorderLine(printWriter);
+
+        long numRows = 0;
+        while (it.hasNext()) {
+            String[] cols = rowFieldsToString(it.next());
+
+            // print content
+            printTableauRow(cols, printWriter);
+            numRows++;
+        }
+
+        // print border line
+        printBorderLine(printWriter);
+        final String rowTerm = numRows > 1 ? "rows" : "row";
+        printWriter.println(numRows + " " + rowTerm + " in set");
+        printWriter.flush();
+    }
+
+    public String[] rowFieldsToString(RowData row) {
+        final int len = printRowKind ? row.getArity() + 1 : row.getArity();
+        final String[] result = new String[len];
+
+        final int offset = printRowKind ? 1 : 0;
+        final String[] conversionResult = converter.toString(row);
+
+        if (printRowKind) {
+            result[0] = row.getRowKind().shortString();
+        }
+
+        for (int i = 0; i < row.getArity(); i++) {
+            if (row.isNullAt(i) && printNullAsEmpty) {
+                result[i + offset] = "";
+            } else {
+                result[i + offset] = conversionResult[i];
+            }
+        }
+        return result;
+    }
+
+    public void printColumnNamesTableauRow(PrintWriter printWriter) {
+        this.printTableauRow(columnNames, printWriter);
+    }
+
+    public void printTableauRow(String[] cols, PrintWriter printWriter) {
+        if (columnWidths == null) {
+            this.columnWidths =
+                    columnWidthsByContent(
+                            this.columnNames, Collections.singletonList(cols), 
maxColumnWidth);
+        }
+
+        StringBuilder sb = new StringBuilder();

Review comment:
       nit: can be `final`

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/print/PrintStyle.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils.print;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+/** Root interface for all print styles. */

Review comment:
       But what is a print style?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/print/PrintStyle.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils.print;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+/** Root interface for all print styles. */
+@Internal
+public interface PrintStyle {
+
+    /** Default max column width. */
+    int DEFAULT_MAX_COLUMN_WIDTH = 30;
+
+    /** Value used to print {@code null}. */
+    String NULL_VALUE = "<NULL>";
+
+    /**
+     * Displays the result.
+     *
+     * <p><b>NOTE:</b> please make sure the data to print is small enough to 
be stored in java heap
+     * memory if the column width is derived from content 
(`deriveColumnWidthByType` is false).

Review comment:
       update JavaDocs? `deriveColumnWidthByType` is not in this class

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/print/PrintStyle.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils.print;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+/** Root interface for all print styles. */
+@Internal
+public interface PrintStyle {
+
+    /** Default max column width. */
+    int DEFAULT_MAX_COLUMN_WIDTH = 30;
+
+    /** Value used to print {@code null}. */
+    String NULL_VALUE = "<NULL>";
+
+    /**
+     * Displays the result.
+     *
+     * <p><b>NOTE:</b> please make sure the data to print is small enough to 
be stored in java heap
+     * memory if the column width is derived from content 
(`deriveColumnWidthByType` is false).
+     *
+     * @param it The iterator for the data to print
+     * @param printWriter The writer to write to
+     */
+    void print(Iterator<RowData> it, PrintWriter printWriter);
+
+    /**
+     * Create a new {@link TableauStyle} using column widths computed from the 
type.
+     *
+     * @param schema the schema of the data to print
+     * @param converter the converter to use to convert field values to string
+     * @param maxColumnWidth Max column width
+     * @param printNullAsEmpty A flag to indicate whether null should be 
printed as empty string
+     *     more than {@code <NULL>}
+     * @param printRowKind A flag to indicate whether print row kind info.
+     */
+    static TableauStyle tableauWithTypeInferredColumnWidths(
+            ResolvedSchema schema,
+            RowDataToStringConverter converter,
+            int maxColumnWidth,
+            boolean printNullAsEmpty,
+            boolean printRowKind) {
+        Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should 
be greater than 0");
+        return new TableauStyle(
+                schema,
+                converter,
+                TableauStyle.columnWidthsByType(
+                        schema.getColumns(), maxColumnWidth, printNullAsEmpty, 
printRowKind),
+                maxColumnWidth,
+                printNullAsEmpty,
+                printRowKind);
+    }
+
+    /**
+     * Like {@link #tableauWithTypeInferredColumnWidths(ResolvedSchema, 
RowDataToStringConverter,
+     * int, boolean, boolean)}, but uses the data to infer the column size.
+     *
+     * <p><b>NOTE:</b> please make sure the data to print is small enough to 
be stored in java heap
+     * memory.
+     */
+    static TableauStyle tableauWithDataInferredColumnWidths(
+            ResolvedSchema schema,
+            RowDataToStringConverter converter,
+            int maxColumnWidth,
+            boolean printNullAsEmpty,
+            boolean printRowKind) {
+        Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should 
be greater than 0");
+        return new TableauStyle(
+                schema, converter, null, maxColumnWidth, printNullAsEmpty, 
printRowKind);
+    }
+
+    /**
+     * Like {@link #tableauWithDataInferredColumnWidths(ResolvedSchema, 
RowDataToStringConverter,
+     * int, boolean, boolean)}, but using default values.
+     *
+     * <p><b>NOTE:</b> please make sure the data to print is small enough to 
be stored in java heap
+     * memory.
+     */
+    static TableauStyle tableauWithDataInferredColumnWidths(
+            ResolvedSchema schema, RowDataToStringConverter converter) {
+        return PrintStyle.tableauWithDataInferredColumnWidths(
+                schema, converter, DEFAULT_MAX_COLUMN_WIDTH, false, false);
+    }
+
+    /**
+     * Create a raw content print style, which only print the result content 
as raw form. column
+     * delimiter is ",", row delimiter is "\n".
+     */
+    static RawContentStyle rawContent(RowDataToStringConverter converter) {

Review comment:
       who needs this?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/print/TableauStyle.java
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils.print;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import com.ibm.icu.lang.UCharacter;
+import com.ibm.icu.lang.UProperty;
+
+import javax.annotation.Nullable;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * Print the result and content as tableau form.
+ *
+ * <p>For example: (printRowKind is true)
+ *
+ * <pre>
+ * +----+-------------+---------+-------------+
+ * | op | boolean_col | int_col | varchar_col |
+ * +----+-------------+---------+-------------+
+ * | +I |        true |       1 |         abc |
+ * | -U |       false |       2 |         def |
+ * | +U |       false |       3 |         def |
+ * | -D |      &lt;NULL&gt; |  &lt;NULL&gt; |      &lt;NULL&gt; |
+ * +----+-------------+---------+-------------+
+ * 4 rows in set
+ * </pre>
+ */
+@Internal
+public final class TableauStyle implements PrintStyle {
+
+    // constants for printing
+    private static final String ROW_KIND_COLUMN = "op";
+    private static final String COLUMN_TRUNCATED_FLAG = "...";
+
+    private final RowDataToStringConverter converter;
+
+    /** The max width of a column. */
+    private final int maxColumnWidth;
+
+    /**
+     * A flag to indicate whether null should be printed as empty string more 
than {@code <NULL>}.
+     */
+    private final boolean printNullAsEmpty;
+
+    /** A flag to indicate whether print row kind info. */
+    private final boolean printRowKind;
+
+    private int[] columnWidths;
+
+    private String[] columnNames;
+
+    TableauStyle(
+            ResolvedSchema resolvedSchema,
+            RowDataToStringConverter converter,
+            int[] columnWidths,
+            int maxColumnWidth,
+            boolean printNullAsEmpty,
+            boolean printRowKind) {
+        this.converter = converter;
+        this.columnWidths = columnWidths;
+        this.maxColumnWidth = maxColumnWidth;
+        this.printNullAsEmpty = printNullAsEmpty;
+        this.printRowKind = printRowKind;
+
+        if (printRowKind) {
+            this.columnNames =
+                    Stream.concat(
+                                    Stream.of(ROW_KIND_COLUMN),
+                                    resolvedSchema.getColumnNames().stream())
+                            .toArray(String[]::new);
+        } else {
+            this.columnNames = resolvedSchema.getColumnNames().toArray(new 
String[0]);
+        }
+    }
+
+    /** Returns null if the column widths are not precomputed using the row 
type. */
+    public @Nullable int[] getColumnWidths() {
+        return columnWidths;
+    }
+
+    @Override
+    public void print(Iterator<RowData> it, PrintWriter printWriter) {
+        if (!it.hasNext()) {
+            printWriter.println("Empty set");
+            printWriter.flush();
+            return;
+        }
+
+        if (columnWidths == null) {
+            final List<RowData> rows = new ArrayList<>();
+            final List<String[]> content = new ArrayList<>();
+            content.add(columnNames);
+            while (it.hasNext()) {
+                RowData row = it.next();
+                rows.add(row);
+                content.add(rowFieldsToString(row));
+            }
+            this.columnWidths = columnWidthsByContent(columnNames, content, 
maxColumnWidth);
+            it = rows.iterator();
+        }
+
+        // print border line
+        printBorderLine(printWriter);
+        // print field names
+        printColumnNamesTableauRow(printWriter);
+        // print border line
+        printBorderLine(printWriter);
+
+        long numRows = 0;
+        while (it.hasNext()) {
+            String[] cols = rowFieldsToString(it.next());
+
+            // print content
+            printTableauRow(cols, printWriter);
+            numRows++;
+        }
+
+        // print border line
+        printBorderLine(printWriter);
+        final String rowTerm = numRows > 1 ? "rows" : "row";
+        printWriter.println(numRows + " " + rowTerm + " in set");
+        printWriter.flush();
+    }
+
+    public String[] rowFieldsToString(RowData row) {
+        final int len = printRowKind ? row.getArity() + 1 : row.getArity();
+        final String[] result = new String[len];
+
+        final int offset = printRowKind ? 1 : 0;
+        final String[] conversionResult = converter.toString(row);
+
+        if (printRowKind) {
+            result[0] = row.getRowKind().shortString();
+        }
+
+        for (int i = 0; i < row.getArity(); i++) {
+            if (row.isNullAt(i) && printNullAsEmpty) {
+                result[i + offset] = "";
+            } else {
+                result[i + offset] = conversionResult[i];
+            }
+        }
+        return result;
+    }
+
+    public void printColumnNamesTableauRow(PrintWriter printWriter) {
+        this.printTableauRow(columnNames, printWriter);
+    }
+
+    public void printTableauRow(String[] cols, PrintWriter printWriter) {
+        if (columnWidths == null) {
+            this.columnWidths =
+                    columnWidthsByContent(
+                            this.columnNames, Collections.singletonList(cols), 
maxColumnWidth);
+        }
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("|");
+        int idx = 0;
+        for (String col : cols) {
+            sb.append(" ");
+            int displayWidth = getStringDisplayWidth(col);
+            if (displayWidth <= columnWidths[idx]) {
+                sb.append(EncodingUtils.repeat(' ', columnWidths[idx] - 
displayWidth));
+                sb.append(col);
+            } else {
+                sb.append(truncateString(col, columnWidths[idx] - 
COLUMN_TRUNCATED_FLAG.length()));
+                sb.append(COLUMN_TRUNCATED_FLAG);
+            }
+            sb.append(" |");
+            idx++;
+        }
+        printWriter.println(sb);
+        printWriter.flush();
+    }
+
+    public void printBorderLine(PrintWriter printWriter) {
+        if (columnWidths == null) {
+            throw new IllegalStateException(
+                    "Column widths should be initialized before printing a 
border line");
+        }
+
+        printWriter.append("+");
+        for (int width : columnWidths) {
+            printWriter.append(EncodingUtils.repeat('-', width + 1));
+            printWriter.append("-+");
+        }
+        printWriter.println();
+    }
+
+    // Package private and private static methods to deal with complexity of 
string writing and
+    // formatting
+
+    /**
+     * Try to derive column width based on column types. If result set is not 
small enough to be
+     * stored in java heap memory, we can't determine column widths based on 
column values.
+     */
+    static int[] columnWidthsByType(
+            List<Column> columns,
+            int maxColumnWidth,
+            boolean printNullAsEmpty,
+            boolean printRowKind) {
+        // fill width with field names first
+        final int[] colWidths = columns.stream().mapToInt(col -> 
col.getName().length()).toArray();
+
+        // determine proper column width based on types
+        for (int i = 0; i < columns.size(); ++i) {
+            LogicalType type = columns.get(i).getDataType().getLogicalType();
+            int len;
+            switch (type.getTypeRoot()) {
+                case TINYINT:
+                    len = TinyIntType.PRECISION + 1; // extra for negative 
value
+                    break;
+                case SMALLINT:
+                    len = SmallIntType.PRECISION + 1; // extra for negative 
value
+                    break;
+                case INTEGER:
+                    len = IntType.PRECISION + 1; // extra for negative value
+                    break;
+                case BIGINT:
+                    len = BigIntType.PRECISION + 1; // extra for negative value
+                    break;
+                case DECIMAL:
+                    len =
+                            ((DecimalType) type).getPrecision()
+                                    + 2; // extra for negative value and 
decimal point
+                    break;
+                case BOOLEAN:
+                    len = 5; // "true" or "false"
+                    break;
+                case DATE:
+                    len = 10; // e.g. 9999-12-31
+                    break;
+                case TIME_WITHOUT_TIME_ZONE:
+                    int precision = ((TimeType) type).getPrecision();
+                    len = precision == 0 ? 8 : precision + 9; // 
23:59:59[.999999999]
+                    break;
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                    precision = ((TimestampType) type).getPrecision();
+                    len = timestampTypeColumnWidth(precision);
+                    break;
+                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                    precision = ((LocalZonedTimestampType) 
type).getPrecision();
+                    len = timestampTypeColumnWidth(precision);
+                    break;
+                default:
+                    len = maxColumnWidth;
+            }
+
+            // adjust column width with potential null values
+            len = printNullAsEmpty ? len : Math.max(len, 
PrintStyle.NULL_VALUE.length());
+            colWidths[i] = Math.max(colWidths[i], len);
+        }
+
+        // add an extra column for row kind if necessary
+        if (printRowKind) {
+            final int[] ret = new int[columns.size() + 1];
+            ret[0] = ROW_KIND_COLUMN.length();
+            System.arraycopy(colWidths, 0, ret, 1, columns.size());
+            return ret;
+        } else {
+            return colWidths;
+        }
+    }
+
+    private static int[] columnWidthsByContent(
+            String[] columnNames, List<String[]> rows, int maxColumnWidth) {
+        // fill width with field names first
+        final int[] colWidths = 
Stream.of(columnNames).mapToInt(String::length).toArray();
+
+        // fill column width with real data
+        for (String[] row : rows) {
+            for (int i = 0; i < row.length; ++i) {
+                colWidths[i] = Math.max(colWidths[i], 
getStringDisplayWidth(row[i]));
+            }
+        }
+
+        // adjust column width with maximum length
+        for (int i = 0; i < colWidths.length; ++i) {
+            colWidths[i] = Math.min(colWidths[i], maxColumnWidth);
+        }
+
+        return colWidths;
+    }
+
+    /**
+     * Here we consider two popular class for timestamp: LocalDateTime and 
java.sql.Timestamp.

Review comment:
       This comment seems outdated. We use only internal types now.

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliTableauResultView.java
##########
@@ -126,34 +122,29 @@ private void checkAndCleanUpQuery(boolean cleanUpQuery) {
     private void printBatchResults(AtomicInteger receivedRowCount) {
         final List<RowData> resultRows = waitBatchResults();
         receivedRowCount.addAndGet(resultRows.size());
-        PrintUtils.printAsTableauForm(
-                resultDescriptor.getResultSchema(),
-                resultRows.iterator(),
-                terminal.writer(),
-                sessionTimeZone);
+        TableauStyle style =
+                PrintStyle.tableauWithDataInferredColumnWidths(
+                        resultDescriptor.getResultSchema(),
+                        resultDescriptor.getRowDataStringConverter(),
+                        PrintStyle.DEFAULT_MAX_COLUMN_WIDTH,
+                        false,
+                        false);
+        style.print(resultRows.iterator(), terminal.writer());
     }
 
     private void printStreamingResults(AtomicInteger receivedRowCount) {
-        List<Column> columns = resultDescriptor.getResultSchema().getColumns();
-        final String[] fieldNames =
-                Stream.concat(
-                                Stream.of(PrintUtils.ROW_KIND_COLUMN),
-                                columns.stream().map(Column::getName))
-                        .toArray(String[]::new);
-
-        final int[] colWidths =
-                PrintUtils.columnWidthsByType(
-                        columns,
+        TableauStyle style =
+                PrintStyle.tableauWithTypeInferredColumnWidths(
+                        resultDescriptor.getResultSchema(),
+                        resultDescriptor.getRowDataStringConverter(),

Review comment:
       Also `Executor#executeOperation` could simply return a custom POJO with 
a String inside.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
##########
@@ -248,96 +223,13 @@ public Builder setPrintStyle(PrintStyle printStyle) {
             return this;
         }
 
-        /** Specifies session time zone. */
-        public Builder setSessionTimeZone(ZoneId sessionTimeZone) {
-            Preconditions.checkNotNull(sessionTimeZone, "sessionTimeZone 
should not be null");
-            this.sessionTimeZone = sessionTimeZone;
-            return this;
-        }
-
         /** Returns a {@link TableResult} instance. */
         public TableResultInternal build() {
+            if (printStyle == null) {
+                printStyle = 
PrintStyle.rawContent(resultProvider.getRowDataStringConverter());

Review comment:
       nit: usually a builder's `build()` method should not its internal state 
but just pass the default on to the constructor

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StaticResultProvider.java
##########
@@ -64,6 +66,23 @@ public StaticResultProvider setJobClient(JobClient 
jobClient) {
         return CloseableIterator.adapterForIterator(this.rows.iterator());
     }
 
+    @Override
+    public RowDataToStringConverter getRowDataStringConverter() {
+        return rowData -> {
+            // This cast is safe because the values from the 
toInternalIterator are always going to
+            // be instanced as GenericRowData (look rowToInternalRow below).
+            GenericRowData genericRowData = (GenericRowData) rowData;
+            String[] results = new String[rowData.getArity()];
+            for (int i = 0; i < results.length; i++) {
+                results[i] =
+                        genericRowData.isNullAt(i)
+                                ? PrintStyle.NULL_VALUE
+                                : "" + genericRowData.getField(i);

Review comment:
       but this also means that we use `true`/`false` as string representation 
here for booleans. don't we want to put this into a separate method for each 
class that we support and throw an exception otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to