This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 26af028  [FLINK-18197][hive] Add more logs for hive streaming 
integration
26af028 is described below

commit 26af028d742f6ae54d62fc40c3e240270a0aadbc
Author: Rui Li <[email protected]>
AuthorDate: Mon Jun 15 16:13:27 2020 +0800

    [FLINK-18197][hive] Add more logs for hive streaming integration
    
    This closes #12625
---
 .../hive/read/HiveContinuousMonitoringFunction.java         |  2 ++
 .../flink/table/filesystem/FileSystemLookupFunction.java    | 13 +++++++++++++
 .../flink/table/filesystem/MetastoreCommitPolicy.java       | 11 +++++++----
 .../flink/table/filesystem/PartitionCommitPolicy.java       | 12 ++++++++++++
 .../flink/table/filesystem/SuccessFileCommitPolicy.java     |  6 ++++++
 .../table/filesystem/stream/StreamingFileCommitter.java     |  1 +
 6 files changed, 41 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
index 893e880..dff6580 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
@@ -293,6 +293,8 @@ public class HiveContinuousMonitoringFunction
                                if (timestamp > maxTimestamp) {
                                        maxTimestamp = timestamp;
                                }
+                               LOG.info("Found new partition {} of table {}, 
forwarding splits to downstream readers",
+                                               partSpec, 
tablePath.getFullName());
                                HiveTableInputSplit[] splits = 
HiveTableInputFormat.createInputSplits(
                                                this.readerParallelism,
                                                
Collections.singletonList(toHiveTablePartition(partition)),
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
index 7219d49..e3bed30 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemLookupFunction.java
@@ -37,6 +37,9 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -54,6 +57,8 @@ public class FileSystemLookupFunction<T extends InputSplit> 
extends TableFunctio
 
        private static final long serialVersionUID = 1L;
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemLookupFunction.class);
+
        private final InputFormat<RowData, T> inputFormat;
        // names and types of the records returned by the input format
        private final String[] producedNames;
@@ -133,14 +138,21 @@ public class FileSystemLookupFunction<T extends 
InputSplit> extends TableFunctio
                if (nextLoadTime > System.currentTimeMillis()) {
                        return;
                }
+               if (nextLoadTime > 0) {
+                       LOG.info("Lookup join cache has expired after {} 
minute(s), reloading", getCacheTTL().toMinutes());
+               } else {
+                       LOG.info("Populating lookup join cache");
+               }
                cache.clear();
                try {
                        T[] inputSplits = inputFormat.createInputSplits(1);
                        GenericRowData reuse = new 
GenericRowData(producedNames.length);
+                       long count = 0;
                        for (T split : inputSplits) {
                                inputFormat.open(split);
                                while (!inputFormat.reachedEnd()) {
                                        RowData row = 
inputFormat.nextRecord(reuse);
+                                       count++;
                                        Row key = extractKey(row);
                                        List<RowData> rows = 
cache.computeIfAbsent(key, k -> new ArrayList<>());
                                        rows.add(serializer.copy(row));
@@ -148,6 +160,7 @@ public class FileSystemLookupFunction<T extends InputSplit> 
extends TableFunctio
                                inputFormat.close();
                        }
                        nextLoadTime = System.currentTimeMillis() + 
getCacheTTL().toMillis();
+                       LOG.info("Loaded {} row(s) into lookup join cache", 
count);
                } catch (IOException e) {
                        throw new FlinkRuntimeException("Failed to load table 
into cache", e);
                }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/MetastoreCommitPolicy.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/MetastoreCommitPolicy.java
index 46f5d3c..cd56e9c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/MetastoreCommitPolicy.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/MetastoreCommitPolicy.java
@@ -20,6 +20,9 @@ package org.apache.flink.table.filesystem;
 
 import org.apache.flink.table.filesystem.TableMetaStoreFactory.TableMetaStore;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.LinkedHashMap;
 
 /**
@@ -30,6 +33,8 @@ import java.util.LinkedHashMap;
  */
 public class MetastoreCommitPolicy implements PartitionCommitPolicy {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(MetastoreCommitPolicy.class);
+
        private TableMetaStore metaStore;
 
        public void setMetastore(TableMetaStore metaStore) {
@@ -38,10 +43,8 @@ public class MetastoreCommitPolicy implements 
PartitionCommitPolicy {
 
        @Override
        public void commit(Context context) throws Exception {
-               LinkedHashMap<String, String> partitionSpec = new 
LinkedHashMap<>();
-               for (int i = 0; i < context.partitionKeys().size(); i++) {
-                       partitionSpec.put(context.partitionKeys().get(i), 
context.partitionValues().get(i));
-               }
+               LinkedHashMap<String, String> partitionSpec = 
context.partitionSpec();
                metaStore.createOrAlterPartition(partitionSpec, 
context.partitionPath());
+               LOG.info("Committed partition {} to metastore", partitionSpec);
        }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionCommitPolicy.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionCommitPolicy.java
index d2c2c7c..8b2cc1c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionCommitPolicy.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionCommitPolicy.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.ValidationException;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -90,6 +91,17 @@ public interface PartitionCommitPolicy {
                 * Path of this partition.
                 */
                Path partitionPath();
+
+               /**
+                * Partition spec in the form of a map from partition keys to 
values.
+                */
+               default LinkedHashMap<String, String> partitionSpec() {
+                       LinkedHashMap<String, String> res = new 
LinkedHashMap<>();
+                       for (int i = 0; i < partitionKeys().size(); i++) {
+                               res.put(partitionKeys().get(i), 
partitionValues().get(i));
+                       }
+                       return res;
+               }
        }
 
        /**
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SuccessFileCommitPolicy.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SuccessFileCommitPolicy.java
index 4472e54..94d6a46 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SuccessFileCommitPolicy.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/SuccessFileCommitPolicy.java
@@ -21,12 +21,17 @@ package org.apache.flink.table.filesystem;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Partition commit policy to add success file to directory. Success file is 
configurable and
  * empty file.
  */
 public class SuccessFileCommitPolicy implements PartitionCommitPolicy {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(SuccessFileCommitPolicy.class);
+
        private final String fileName;
        private final FileSystem fileSystem;
 
@@ -40,5 +45,6 @@ public class SuccessFileCommitPolicy implements 
PartitionCommitPolicy {
                fileSystem.create(
                                new Path(context.partitionPath(), fileName),
                                FileSystem.WriteMode.OVERWRITE).close();
+               LOG.info("Committed partition {} with success file", 
context.partitionSpec());
        }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
index ec63faf..526970e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
@@ -156,6 +156,7 @@ public class StreamingFileCommitter extends 
AbstractStreamOperator<Void>
                try (TableMetaStoreFactory.TableMetaStore metaStore = 
metaStoreFactory.createTableMetaStore()) {
                        for (String partition : partitions) {
                                LinkedHashMap<String, String> partSpec = 
extractPartitionSpecFromPath(new Path(partition));
+                               LOG.info("Partition {} of table {} is ready to 
be committed", partSpec, tableIdentifier);
                                Path path = new Path(locationPath, 
generatePartitionPath(partSpec));
                                PartitionCommitPolicy.Context context = new 
PolicyContext(
                                                new 
ArrayList<>(partSpec.values()), path);

Reply via email to