wuchong commented on a change in pull request #12126:
URL: https://github.com/apache/flink/pull/12126#discussion_r424907675



##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/BlackHoleITCase.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.table.factories.BlackHoleTableSinkFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+/**
+ * End to end tests for {@link BlackHoleTableSinkFactory}.
+ */
+public class BlackHoleITCase extends StreamingTestBase {
+
+       @Test
+       public void testTypes() {
+               tEnv().executeSql(
+                               "create table blackhole_t (f0 int, f1 double) 
with ('connector' = 'blackhole')");
+               execInsertTableAndWaitResult(
+                               tEnv().fromValues(Arrays.asList(Row.of(1, 1.1), 
Row.of(2, 2.2))),

Review comment:
       Can be simplified to `tEnv().fromValues(row(1, 1.1), row(2, 2.2)),` use 
the `org.apache.flink.table.api.Expressions.row`.

##########
File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/PrintTableSinkFactory.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.util.PrintSinkOutputWriter;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import 
org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Print table sink factory writing every row to the standard output or 
standard error stream.
+ * It is designed for:
+ * - easy test for streaming job.
+ * - very useful in production debugging.
+ *
+ * <p>
+ * Four possible format options:
+ *     {@code PRINT_IDENTIFIER}:taskId> output  <- {@code PRINT_IDENTIFIER} 
provided, parallelism > 1
+ *     {@code PRINT_IDENTIFIER}> output         <- {@code PRINT_IDENTIFIER} 
provided, parallelism == 1
+ *  taskId> output                                        <- no {@code 
PRINT_IDENTIFIER} provided, parallelism > 1
+ *  output                                                <- no {@code 
PRINT_IDENTIFIER} provided, parallelism == 1
+ * </p>
+ *
+ * <p>output string format is "$RowKind(f0,f1,f2...)", example is: "+I(1,1)".
+ */
+@PublicEvolving
+public class PrintTableSinkFactory implements DynamicTableSinkFactory {
+
+       public static final String IDENTIFIER = "print";
+
+       public static final ConfigOption<String> PRINT_IDENTIFIER = 
key("print-identifier")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Message that identify print and is 
prefixed to the output of the value.");
+
+       public static final ConfigOption<Boolean> STANDARD_ERROR = 
key("standard-error")
+                       .booleanType()
+                       .defaultValue(false)
+                       .withDescription("True, if the format should print to 
standard error instead of standard out.");
+
+       @Override
+       public String factoryIdentifier() {
+               return IDENTIFIER;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> requiredOptions() {
+               return new HashSet<>();
+       }
+
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(PRINT_IDENTIFIER);
+               options.add(STANDARD_ERROR);
+               return options;
+       }
+
+       @Override
+       public DynamicTableSink createDynamicTableSink(Context context) {
+               FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+               helper.validate();
+               ReadableConfig options = helper.getOptions();
+               return new PrintSink(
+                               
context.getCatalogTable().getSchema().toPhysicalRowDataType(),
+                               options.get(PRINT_IDENTIFIER),
+                               options.get(STANDARD_ERROR));
+       }
+
+       private static class PrintSink implements DynamicTableSink {
+
+               private final DataType type;
+               private final String printIdentifier;
+               private final boolean stdErr;
+
+               private PrintSink(DataType type, String printIdentifier, 
boolean stdErr) {
+                       this.type = type;
+                       this.printIdentifier = printIdentifier;
+                       this.stdErr = stdErr;
+               }
+
+               @Override
+               public ChangelogMode getChangelogMode(ChangelogMode 
requestedMode) {
+                       return requestedMode;
+               }
+
+               @Override
+               public SinkRuntimeProvider 
getSinkRuntimeProvider(DynamicTableSink.Context context) {
+                       DataStructureConverter converter = 
context.createDataStructureConverter(type);
+                       return SinkFunctionProvider.of(new 
RowDataPrintFunction(converter, printIdentifier, stdErr));
+               }
+
+               @Override
+               public DynamicTableSink copy() {
+                       return new PrintSink(type, printIdentifier, stdErr);
+               }
+
+               @Override
+               public String asSummaryString() {
+                       return "Print to " + (stdErr ? "System.err" : 
"System.out");
+               }
+       }
+
+       /**
+        * Implementation of the SinkFunction converting {@link RowData} to 
string and
+        * passing to {@link PrintSinkFunction}.
+        */
+       private static class RowDataPrintFunction extends 
RichSinkFunction<RowData> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final DataStructureConverter converter;
+               private final PrintSinkOutputWriter<String> writer;
+
+               private RowDataPrintFunction(
+                               DataStructureConverter converter, String 
printIdentifier, boolean stdErr) {
+                       this.converter = converter;
+                       this.writer = new 
PrintSinkOutputWriter<>(printIdentifier, stdErr);
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+                       StreamingRuntimeContext context = 
(StreamingRuntimeContext) getRuntimeContext();
+                       writer.open(context.getIndexOfThisSubtask(), 
context.getNumberOfParallelSubtasks());
+               }
+
+               @Override
+               public void invoke(RowData value, Context context) {
+                       String rowKind = value.getRowKind().shortString();
+                       Object data = converter.toExternal(value);
+                       writer.write(rowKind + "(" + data + ")");
+               }
+
+               @Override
+               public String toString() {
+                       return writer.toString();
+               }

Review comment:
       Do we need this?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/PrintITCase.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.table.factories.PrintTableSinkFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * End to end tests for {@link PrintTableSinkFactory}.
+ */
+public class PrintITCase extends StreamingTestBase {

Review comment:
       `PrintConnectorITCase`? Make it more explicitly it's not a print IT case 
for `Table#executeInsert()#print()`.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/BlackHoleITCase.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.table.factories.BlackHoleTableSinkFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+/**
+ * End to end tests for {@link BlackHoleTableSinkFactory}.
+ */
+public class BlackHoleITCase extends StreamingTestBase {

Review comment:
       `BlackHoleConnectorITCase`?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/PrintITCase.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.table.factories.PrintTableSinkFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * End to end tests for {@link PrintTableSinkFactory}.
+ */
+public class PrintITCase extends StreamingTestBase {
+
+       private final PrintStream originalSystemOut = System.out;
+       private final PrintStream originalSystemErr = System.err;
+
+       private final ByteArrayOutputStream arrayOutputStream = new 
ByteArrayOutputStream();
+       private final ByteArrayOutputStream arrayErrorStream = new 
ByteArrayOutputStream();
+
+       @Before
+       public void setUp() {
+               System.setOut(new PrintStream(arrayOutputStream));
+               System.setErr(new PrintStream(arrayErrorStream));
+       }
+
+       @After
+       public void tearDown() {
+               if (System.out != originalSystemOut) {
+                       System.out.close();
+               }
+               if (System.err != originalSystemErr) {
+                       System.err.close();
+               }
+               System.setOut(originalSystemOut);
+               System.setErr(originalSystemErr);
+       }
+
+       @Test
+       public void testTypes() {
+               test(false);
+       }
+
+       @Test
+       public void testStandardError() {
+               test(true);
+       }
+
+       private void test(boolean standardError) {
+               tEnv().executeSql(String.format("create table print_t (" +

Review comment:
       Could you format the DDL a bit? Put every column at a single line. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to