shen created FLINK-30989: ---------------------------- Summary: Configuration table.exec.spill-compression.block-size not take effect in batch job Key: FLINK-30989 URL: https://issues.apache.org/jira/browse/FLINK-30989 Project: Flink Issue Type: Bug Components: API / DataStream, Runtime / Configuration Affects Versions: 1.16.1 Reporter: shen Attachments: image-2023-02-09-19-37-44-927.png
h1. Description I tried to config table.exec.spill-compression.block-size in TableEnv in my job and failed. I attached to TaskManager and found conf passed to constructor of [BinaryExternalSorter|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BinaryExternalSorter.java#L204] is empty: !image-2023-02-09-19-37-44-927.png|width=306,height=185! h1. How to reproduce A simple code to reproduce this problem: {code:java} // App.java package test.flink403; import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.AlgorithmOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import java.util.Arrays; public class App { public static void main(String argc[]) throws Exception { Configuration config = new Configuration(); config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); config.set(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, true); config.set(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS, true); config.setString(TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.key(), "32 m"); // <---- cannot take effect config.set(AlgorithmOptions.SORT_SPILLING_THRESHOLD, Float.valueOf(0.5f)); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, config); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.getConfig().set("table.exec.spill-compression.block-size", "32 m"); // <---- cannot take effect final DataStream<Order> orderA = env.fromCollection( Arrays.asList( new Order(1L, "beer", 3), new Order(1L, "diaper", 4), new Order(3L, "rubber", 2))); final Table tableA = tableEnv.fromDataStream(orderA); final Table result = tableEnv.sqlQuery( "SELECT * FROM " + tableA + " " + " order by user"); tableEnv.toDataStream(result, Order.class).print(); env.execute(); } } // --------------------------------------------------------------- // Order.java package test.flink403; public class Order { public Long user; public String product; public int amount; // for POJO detection in DataStream API public Order() {} // for structured type detection in Table API public Order(Long user, String product, int amount) { this.user = user; this.product = product; this.amount = amount; } @Override public String toString() { return "Order{" + "user=" + user + ", product='" + product + '\'' + ", amount=" + amount + '}'; } }{code} I think it is because [SortOperator|https://github.com/apache/flink/blob/release-1.16.1/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/SortOperator.java#L88] try to get conf from JobConfiguration, which should be set in JobGraph. Following are the Classes use the same method to get conf from JobConfiguration: * BinaryExternalSorter ** ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED ** ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE * BinaryHashTable,BaseHybridHashTable ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED ** ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE * SortDataInput ** AlgorithmOptions.SORT_SPILLING_THRESHOLD ** AlgorithmOptions.SPILLING_MAX_FAN ** AlgorithmOptions.USE_LARGE_RECORDS_HANDLER -- This message was sent by Atlassian Jira (v8.20.10#820010)