This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c727e36  [fix] cache loader cannot be serialized (#63)
c727e36 is described below

commit c727e36de804570da6ba5952e77e7160d000a2bf
Author: gnehil <[email protected]>
AuthorDate: Thu Dec 22 17:59:14 2022 +0800

    [fix] cache loader cannot be serialized (#63)
---
 .../org/apache/doris/spark/DorisStreamLoad.java    | 33 ++++++++++++++--------
 1 file changed, 21 insertions(+), 12 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 8cebe68..25ed7b1 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -94,12 +94,7 @@ public class DorisStreamLoad implements Serializable{
         this.streamLoadProp=getStreamLoadProp(settings);
         cache = CacheBuilder.newBuilder()
                 .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
-                .build(new CacheLoader<String, List<BackendV2.BackendRowV2>>() 
{
-                    @Override
-                    public List<BackendV2.BackendRowV2> load(String key) 
throws IOException, DorisException {
-                        return RestService.getBackendRows(settings, LOG);
-                    }
-                });
+                .build(new BackendCacheLoader(settings));
     }
 
     public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws 
IOException, DorisException {
@@ -118,12 +113,7 @@ public class DorisStreamLoad implements Serializable{
         this.streamLoadProp=getStreamLoadProp(settings);
         cache = CacheBuilder.newBuilder()
                 .expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
-                .build(new CacheLoader<String, List<BackendV2.BackendRowV2>>() 
{
-                    @Override
-                    public List<BackendV2.BackendRowV2> load(String key) 
throws IOException, DorisException {
-                        return RestService.getBackendRows(settings, LOG);
-                    }
-                });
+                .build(new BackendCacheLoader(settings));
     }
 
     public String getLoadUrlStr() {
@@ -323,4 +313,23 @@ public class DorisStreamLoad implements Serializable{
             throw new RuntimeException("get backends info fail",e);
         }
     }
+
+    /**
+     * serializable be cache loader
+     */
+    private static class BackendCacheLoader extends CacheLoader<String, 
List<BackendV2.BackendRowV2>> implements Serializable {
+
+        private final SparkSettings settings;
+
+        public BackendCacheLoader(SparkSettings settings) {
+            this.settings = settings;
+        }
+
+        @Override
+        public List<BackendV2.BackendRowV2> load(String key) throws Exception {
+            return RestService.getBackendRows(settings, LOG);
+        }
+
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to