[
https://issues.apache.org/jira/browse/BEAM-1491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yangping wu updated BEAM-1491:
------------------------------
Description:
Currently, if we want to read file store on HDFS, we will do it as follow:
{code}
PRead.Bounded<KV<LongWritable, Text>> from =
Read.from(HDFSFileSource.from("hdfs://hadoopserver:8020/tmp/data.txt",
TextInputFormat.class, LongWritable.class, Text.class));
PCollection<KV<LongWritable, Text>> data = p.apply(from);
{code}
or
{code}
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://hadoopserver:8020");
PRead.Bounded<KV<LongWritable, Text>> from =
Read.from(HDFSFileSource.from("/tmp/data.txt", TextInputFormat.class,
LongWritable.class, Text.class).withConfiguration(conf));
PCollection<KV<LongWritable, Text>> data = p.apply(from);
{code}
As we have seen above, we must be set {{hdfs://hadoopserver:8020}} in the file
path
if we can initialize {{conf}} by reading {{HADOOP_CONF_DIR}}({{YARN_CONF_DIR}})
environmen variable, then we can read HDFS file like this:
{code}
PRead.Bounded<KV<LongWritable, Text>> from =
Read.from(HDFSFileSource.from("/tmp/data.txt", TextInputFormat.class,
LongWritable.class, Text.class));
PCollection<KV<LongWritable, Text>> data = p.apply(from);
{code}
note we don't specify {{hdfs://hadoopserver:8020}} prefix, because the program
read it from {{HADOOP_CONF_DIR}}({{YARN_CONF_DIR}}) environmen, and the
program will read file from HDFS.
was:
Currently, if we want to read file store on HDFS, we will do it as follow:
{code} PCollection<KV<LongWritable, Text>> resultCollection =
p.apply(HDFSFileSource.readFrom(
"hdfs://hadoopserver:8020/tmp/data.txt",
TextInputFormat.class, LongWritable.class, Text.class));
{code}
As we have seen above, we must be set {{hdfs://hadoopserver:8020}} in the file
path, and we cann't set any variables when read file, because in
[HDFSFileSource.java|https://github.com/apache/beam/blob/master/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java#L310]
we initialize {{job}} instance as follow:
{code}
this.job = Job.getInstance();
{code}
we should initialize {{job}} instance by configure:
{code}
this.job = Job.getInstance(conf);
{code}
where {{conf}} is instance of {{Configuration}}, and we initialize {{conf}} by
reading {{HADOOP_CONF_DIR}}({{YARN_CONF_DIR}}) environmen variable,then we can
read HDFS file like this:
{code} PCollection<KV<LongWritable, Text>> resultCollection =
p.apply(HDFSFileSource.readFrom(
"/tmp/data.txt",
TextInputFormat.class, LongWritable.class, Text.class));
{code}
note we don't specify {{hdfs://hadoopserver:8020}} prefix, because the program
read it from {{HADOOP_CONF_DIR}}({{YARN_CONF_DIR}}) environmen.
> HDFSFileSource should be able to read the HADOOP_CONF_DIR(YARN_CONF_DIR)
> environmen variable
> --------------------------------------------------------------------------------------------
>
> Key: BEAM-1491
> URL: https://issues.apache.org/jira/browse/BEAM-1491
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Affects Versions: 0.5.0
> Reporter: yangping wu
> Assignee: Jean-Baptiste Onofré
>
> Currently, if we want to read file store on HDFS, we will do it as follow:
> {code}
> PRead.Bounded<KV<LongWritable, Text>> from =
> Read.from(HDFSFileSource.from("hdfs://hadoopserver:8020/tmp/data.txt",
> TextInputFormat.class, LongWritable.class, Text.class));
> PCollection<KV<LongWritable, Text>> data = p.apply(from);
> {code}
> or
> {code}
> Configuration conf = new Configuration();
> conf.set("fs.default.name", "hdfs://hadoopserver:8020");
> PRead.Bounded<KV<LongWritable, Text>> from =
> Read.from(HDFSFileSource.from("/tmp/data.txt", TextInputFormat.class,
> LongWritable.class, Text.class).withConfiguration(conf));
> PCollection<KV<LongWritable, Text>> data = p.apply(from);
> {code}
> As we have seen above, we must be set {{hdfs://hadoopserver:8020}} in the
> file path
> if we can initialize {{conf}} by reading
> {{HADOOP_CONF_DIR}}({{YARN_CONF_DIR}}) environmen variable, then we can read
> HDFS file like this:
> {code}
> PRead.Bounded<KV<LongWritable, Text>> from =
> Read.from(HDFSFileSource.from("/tmp/data.txt", TextInputFormat.class,
> LongWritable.class, Text.class));
> PCollection<KV<LongWritable, Text>> data = p.apply(from);
> {code}
> note we don't specify {{hdfs://hadoopserver:8020}} prefix, because the
> program read it from {{HADOOP_CONF_DIR}}({{YARN_CONF_DIR}}) environmen, and
> the program will read file from HDFS.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)