[
https://issues.apache.org/jira/browse/FLINK-1910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chesnay Schepler updated FLINK-1910:
------------------------------------
Description:
{code:java}
public static LinkedList values=new LinkedList<String>();
public static void main(String[] args) throws Exception
{
values.add("AUTOMOBILE");
values.add("XSTf4&&NCwDVaWNe6tEgvwfmRchLXak");
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSet<Customer> customers = getCustomerDataSet(env);
customers = customers.filter(
new FilterFunction<Customer>() {
@Override
public boolean filter(Customer c) {
return c.getField(4).equals(values.get(0).toString()) &&
c.getField(2).equals(values.get(1).toString()) ;
}
});
System.out.println(customers.print());
customers.writeAsCsv("/home/hadoop/Desktop/Dataset/output.csv", "\n",
"|");
env.execute();
}
public static class Customer extends Tuple5<Long,String,String,String,String> {
}
private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env) {
return env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv")
.fieldDelimiter('|')
.includeFields("11100110").ignoreFirstLine()
.tupleType(Customer.class);
}
{code}
Environment: (was: public static LinkedList values=new
LinkedList<String>();
public static void main(String[] args) throws Exception {
values.add("AUTOMOBILE");
values.add("XSTf4&&NCwDVaWNe6tEgvwfmRchLXak");
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSet<Customer> customers = getCustomerDataSet(env);
customers = customers.filter(
new FilterFunction<Customer>() {
@Override
public boolean filter(Customer c) {
return c.getField(4).equals(values.get(0).toString()) &&
c.getField(2).equals(values.get(1).toString()) ;
}
});
System.out.println(customers.print());
customers.writeAsCsv("/home/hadoop/Desktop/Dataset/output.csv", "\n",
"|");
env.execute();
}
public static class Customer extends
Tuple5<Long,String,String,String,String> {
}
private static DataSet<Customer>
getCustomerDataSet(ExecutionEnvironment env) {
return env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv")
.fieldDelimiter('|')
.includeFields("11100110").ignoreFirstLine()
.tupleType(Customer.class);
})
> why this code flink not reurn value when use variable in filter
> ---------------------------------------------------------------
>
> Key: FLINK-1910
> URL: https://issues.apache.org/jira/browse/FLINK-1910
> Project: Flink
> Issue Type: Bug
> Reporter: hagersaleh
>
> {code:java}
> public static LinkedList values=new LinkedList<String>();
> public static void main(String[] args) throws Exception
> {
> values.add("AUTOMOBILE");
> values.add("XSTf4&&NCwDVaWNe6tEgvwfmRchLXak");
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet<Customer> customers = getCustomerDataSet(env);
> customers = customers.filter(
> new FilterFunction<Customer>() {
> @Override
> public boolean filter(Customer c) {
> return c.getField(4).equals(values.get(0).toString()) &&
> c.getField(2).equals(values.get(1).toString()) ;
> }
> });
> System.out.println(customers.print());
> customers.writeAsCsv("/home/hadoop/Desktop/Dataset/output.csv", "\n",
> "|");
> env.execute();
> }
> public static class Customer extends Tuple5<Long,String,String,String,String>
> {
> }
> private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env)
> {
> return env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv")
> .fieldDelimiter('|')
> .includeFields("11100110").ignoreFirstLine()
> .tupleType(Customer.class);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)