[
https://issues.apache.org/jira/browse/FLINK-24973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
anigkus updated FLINK-24973:
----------------------------
Description:
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.myorg.quickstart;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;
import java.io.File;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class DistributedCache {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
//1、register a file from HDFS
//env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile");
env.registerCachedFile("file:///tmp/tmp.txt", "localFile",true);
DataSource<String> data = env.fromElements("a", "b", "c", "d");
data.map(new RichMapFunction<String, String>() {
private List<String> cache = new ArrayList<String>();
@Override
public void open(Configuration parameters) throws Exception {
//super.open(parameters);
//2、access cached file via RuntimeContext and DistributedCache
File file =
getRuntimeContext().getDistributedCache().getFile("localFile");
List<String> lines = FileUtils.readLines(file,"UTF-8");
for (String line : lines) {
cache.add(line);
System.out.println("line=[" + line + "]");
}
}
@Override
public String map(String value) throws Exception {
//value=["a", "b", "c", "d"];
return value;
}
}).print();
}
} {code}
#/tmp/tmp.txt–>this file existe
List<String> lines = FileUtils.readLines(file,"UTF-8"); //lines.size()=0
Why....
was:
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.myorg.quickstart;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;
import java.io.File;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
/**
* Copyright © DEEPEXI Technologies Co., Ltd. 2018-2020. All rights reserved.
*
* @Author zhangchunping
* @Date 11/21/21 12:30 PM
* @Description ???
*/
public class DistributedCache {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
//1、register a file from HDFS
//env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile");
env.registerCachedFile("file:///tmp/tmp.txt", "localFile",true);
DataSource<String> data = env.fromElements("a", "b", "c", "d");
data.map(new RichMapFunction<String, String>() {
private List<String> cache = new ArrayList<String>();
@Override
public void open(Configuration parameters) throws Exception {
//super.open(parameters);
//2、access cached file via RuntimeContext and DistributedCache
File file =
getRuntimeContext().getDistributedCache().getFile("localFile");
List<String> lines = FileUtils.readLines(file,"UTF-8");
for (String line : lines) {
cache.add(line);
System.out.println("line=[" + line + "]");
}
}
@Override
public String map(String value) throws Exception {
//value=["a", "b", "c", "d"];
return value;
}
}).print();
}
} {code}
#/tmp/tmp.txt–>this file existe
List<String> lines = FileUtils.readLines(file,"UTF-8"); //lines.size()=0
Why....
> flink registercachedfile example no effect
> ------------------------------------------
>
> Key: FLINK-24973
> URL: https://issues.apache.org/jira/browse/FLINK-24973
> Project: Flink
> Issue Type: Technical Debt
> Components: API / Core
> Affects Versions: shaded-14.0
> Reporter: anigkus
> Priority: Minor
>
> {code:java}
> /*
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.myorg.quickstart;
> import org.apache.commons.io.FileUtils;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.configuration.Configuration;
> import java.io.File;
> import java.nio.charset.Charset;
> import java.util.ArrayList;
> import java.util.List;
> public class DistributedCache {
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> //1、register a file from HDFS
> //env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile");
> env.registerCachedFile("file:///tmp/tmp.txt", "localFile",true);
> DataSource<String> data = env.fromElements("a", "b", "c", "d");
> data.map(new RichMapFunction<String, String>() {
> private List<String> cache = new ArrayList<String>();
> @Override
> public void open(Configuration parameters) throws Exception {
> //super.open(parameters);
> //2、access cached file via RuntimeContext and DistributedCache
> File file =
> getRuntimeContext().getDistributedCache().getFile("localFile");
> List<String> lines = FileUtils.readLines(file,"UTF-8");
>
> for (String line : lines) {
> cache.add(line);
> System.out.println("line=[" + line + "]");
> }
> }
> @Override
> public String map(String value) throws Exception {
> //value=["a", "b", "c", "d"];
> return value;
> }
> }).print();
> }
> } {code}
> #/tmp/tmp.txt–>this file existe
> List<String> lines = FileUtils.readLines(file,"UTF-8"); //lines.size()=0
> Why....
--
This message was sent by Atlassian Jira
(v8.20.1#820001)