This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-hive.git
commit d96d03570762611045f6efc1bcb226b0c05c1f72 Author: Rui Fan <1996fan...@gmail.com> AuthorDate: Fri Jan 12 17:12:25 2024 +0800 [FLINK-34081][configuration] Refactor all callers of deprecated `getXxx(ConfigOption<Xxx>)`, `getXxx(ConfigOption<Xxx>, Xxx)` and `setXxx(ConfigOption<Integer>, Xxx)` methods of Configuration --- .../main/java/org/apache/flink/connectors/hive/HiveTableSink.java | 8 ++++---- .../java/org/apache/flink/connectors/hive/HiveTableSource.java | 6 +++--- .../security/token/HiveServer2DelegationTokenProviderITCase.java | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index 779de538..f55fed51 100644 --- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -210,7 +210,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su // the table's option "SINK_PARTITION_COMMIT_POLICY_KIND" should contain 'metastore' org.apache.flink.configuration.Configuration flinkConf = org.apache.flink.configuration.Configuration.fromMap(catalogTable.getOptions()); - String policyKind = flinkConf.getString(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND); + String policyKind = flinkConf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND); String[] policyStrings = policyKind.split(","); Arrays.stream(policyStrings) .filter(policy -> policy.equalsIgnoreCase(PartitionCommitPolicy.METASTORE)) @@ -383,7 +383,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration(); catalogTable.getOptions().forEach(conf::setString); - boolean autoCompaction = conf.getBoolean(FileSystemConnectorOptions.AUTO_COMPACTION); + boolean autoCompaction = conf.get(FileSystemConnectorOptions.AUTO_COMPACTION); if (autoCompaction) { Optional<Integer> compactParallelismOptional = conf.getOptional(FileSystemConnectorOptions.COMPACTION_PARALLELISM); @@ -638,7 +638,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su catalogTable.getOptions().forEach(conf::setString); String commitPolicies = - conf.getString(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND); + conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND); if (!getPartitionKeys().isEmpty() && StringUtils.isNullOrWhitespaceOnly(commitPolicies)) { throw new FlinkHiveException( String.format( @@ -648,7 +648,7 @@ public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, Su FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key())); } - boolean autoCompaction = conf.getBoolean(FileSystemConnectorOptions.AUTO_COMPACTION); + boolean autoCompaction = conf.get(FileSystemConnectorOptions.AUTO_COMPACTION); if (autoCompaction) { fileNamingBuilder.withPartPrefix( convertToUncompacted(fileNamingBuilder.build().getPartPrefix())); diff --git a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java index a1dac332..91931300 100644 --- a/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java +++ b/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java @@ -429,7 +429,7 @@ public class HiveTableSource case PARTITION_NAME: if (configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET)) { String consumeOffsetStr = - configuration.getString(STREAMING_SOURCE_CONSUME_START_OFFSET); + configuration.get(STREAMING_SOURCE_CONSUME_START_OFFSET); consumeStartOffset = (T) consumeOffsetStr; } else { consumeStartOffset = (T) DEFAULT_MIN_NAME_OFFSET; @@ -440,12 +440,12 @@ public class HiveTableSource case CREATE_TIME: if (configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET)) { String consumeOffsetStr = - configuration.getString(STREAMING_SOURCE_CONSUME_START_OFFSET); + configuration.get(STREAMING_SOURCE_CONSUME_START_OFFSET); LocalDateTime localDateTime = DefaultPartTimeExtractor.toLocalDateTime( consumeOffsetStr, - configuration.getString( + configuration.get( PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER)); consumeStartOffset = diff --git a/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java b/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java index 0bce268e..14c6c4eb 100644 --- a/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java +++ b/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java @@ -108,8 +108,8 @@ public class HiveServer2DelegationTokenProviderITCase { new org.apache.flink.configuration.Configuration(); final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab")); - configuration.setString(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString()); - configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, "t...@example.com"); + configuration.set(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString()); + configuration.set(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, "t...@example.com"); provider.init(configuration); boolean result = provider.delegationTokensRequired(); assertTrue(result);