[ https://issues.apache.org/jira/browse/SPARK-7032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lior Chaga updated SPARK-7032: ------------------------------ Description: When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do not match expected. In the following example, only 1 record should be in first table and not in second (as when grouping by key field, the counter for key=1 is 10 in both tables). Each of the clauses by itself is working properly when running exclusively. {code} //import com.addthis.metrics.reporter.config.ReporterConfig; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.Row; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class SimpleApp { public static void main(String[] args) throws IOException { SparkConf conf = new SparkConf().setAppName("Simple Application") .setMaster("local[1]"); JavaSparkContext sc = new JavaSparkContext(conf); List<MyObject> firstList = new ArrayList<MyObject>(2); firstList.add(new MyObject(1, 10)); firstList.add(new MyObject(2, 10)); List<MyObject> secondList = new ArrayList<MyObject>(3); secondList.add(new MyObject(1, 4)); secondList.add(new MyObject(1, 6)); secondList.add(new MyObject(2, 8)); JavaRDD<MyObject> firstRdd = sc.parallelize(firstList); JavaRDD<MyObject> secondRdd = sc.parallelize(secondList); JavaSQLContext sqlc = new JavaSQLContext(sc); sqlc.applySchema(firstRdd, MyObject.class).registerTempTable("table1"); sqlc.sqlContext().cacheTable("table1"); sqlc.applySchema(secondRdd, MyObject.class).registerTempTable("table2"); sqlc.sqlContext().cacheTable("table2"); List<Row> firstMinusSecond = sqlc.sql( "SELECT key, counter FROM table1 " + "EXCEPT " + "SELECT key, SUM(counter) FROM table2 " + "GROUP BY key ").collect(); System.out.println("num of rows in first but not in second = [" + firstMinusSecond.size() + "]"); sc.close(); System.exit(0); } public static class MyObject implements Serializable { public MyObject(Integer key, Integer counter) { this.key = key; this.counter = counter; } private Integer key; private Integer counter; public Integer getKey() { return key; } public void setKey(Integer key) { this.key = key; } public Integer getCounter() { return counter; } public void setCounter(Integer counter) { this.counter = counter; } } } {code} was: When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do not match expected. In the following example, only 1 record should be in first table and not in second (as when grouping by key field, the counter for key=1 is 10 in both tables). Each of the clauses by itself is working properly when running exclusively. {code} //import com.addthis.metrics.reporter.config.ReporterConfig; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.Row; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class SimpleApp { public static void main(String[] args) throws IOException { SparkConf conf = new SparkConf().setAppName("Simple Application") .setMaster("local[1]"); JavaSparkContext sc = new JavaSparkContext(conf); List<MyObject> firstList = new ArrayList<MyObject>(2); firstList.add(new MyObject(1, 10)); firstList.add(new MyObject(2, 10)); List<MyObject> secondList = new ArrayList<MyObject>(3); secondList.add(new MyObject(1, 4)); secondList.add(new MyObject(1, 6)); secondList.add(new MyObject(2, 8)); JavaRDD<MyObject> firstRdd = sc.parallelize(firstList); JavaRDD<MyObject> secondRdd = sc.parallelize(firstList); JavaSQLContext sqlc = new JavaSQLContext(sc); sqlc.applySchema(firstRdd, MyObject.class).registerTempTable("table1"); sqlc.sqlContext().cacheTable("table1"); sqlc.applySchema(secondRdd, MyObject.class).registerTempTable("table2"); sqlc.sqlContext().cacheTable("table2"); List<Row> firstMinusSecond = sqlc.sql( "SELECT key, counter FROM table1 " + "EXCEPT " + "SELECT key, SUM(counter) FROM table2 " + "GROUP BY key ").collect(); System.out.println("num of rows in first but not in second = [" + firstMinusSecond.size() + "]"); sc.close(); System.exit(0); } public static class MyObject implements Serializable { public MyObject(Integer key, Integer counter) { this.key = key; this.counter = counter; } private Integer key; private Integer counter; public Integer getKey() { return key; } public void setKey(Integer key) { this.key = key; } public Integer getCounter() { return counter; } public void setCounter(Integer counter) { this.counter = counter; } } } {code} > SparkSQL incorrect results when using UNION/EXCEPT with GROUP BY clause > ----------------------------------------------------------------------- > > Key: SPARK-7032 > URL: https://issues.apache.org/jira/browse/SPARK-7032 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.2.2 > Reporter: Lior Chaga > > When using UNION/EXCEPT clause with GROUP BY clause in spark sql, results do > not match expected. > In the following example, only 1 record should be in first table and not in > second (as when grouping by key field, the counter for key=1 is 10 in both > tables). > Each of the clauses by itself is working properly when running exclusively. > {code} > //import com.addthis.metrics.reporter.config.ReporterConfig; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.sql.api.java.JavaSQLContext; > import org.apache.spark.sql.api.java.Row; > import java.io.IOException; > import java.io.Serializable; > import java.util.ArrayList; > import java.util.List; > public class SimpleApp { > public static void main(String[] args) throws IOException { > SparkConf conf = new SparkConf().setAppName("Simple Application") > .setMaster("local[1]"); > JavaSparkContext sc = new JavaSparkContext(conf); > List<MyObject> firstList = new ArrayList<MyObject>(2); > firstList.add(new MyObject(1, 10)); > firstList.add(new MyObject(2, 10)); > List<MyObject> secondList = new ArrayList<MyObject>(3); > secondList.add(new MyObject(1, 4)); > secondList.add(new MyObject(1, 6)); > secondList.add(new MyObject(2, 8)); > JavaRDD<MyObject> firstRdd = sc.parallelize(firstList); > JavaRDD<MyObject> secondRdd = sc.parallelize(secondList); > JavaSQLContext sqlc = new JavaSQLContext(sc); > sqlc.applySchema(firstRdd, > MyObject.class).registerTempTable("table1"); > sqlc.sqlContext().cacheTable("table1"); > sqlc.applySchema(secondRdd, > MyObject.class).registerTempTable("table2"); > sqlc.sqlContext().cacheTable("table2"); > List<Row> firstMinusSecond = sqlc.sql( > "SELECT key, counter FROM table1 " + > "EXCEPT " + > "SELECT key, SUM(counter) FROM table2 " + > "GROUP BY key ").collect(); > System.out.println("num of rows in first but not in second = [" + > firstMinusSecond.size() + "]"); > sc.close(); > System.exit(0); > } > public static class MyObject implements Serializable { > public MyObject(Integer key, Integer counter) { > this.key = key; > this.counter = counter; > } > private Integer key; > private Integer counter; > public Integer getKey() { > return key; > } > public void setKey(Integer key) { > this.key = key; > } > public Integer getCounter() { > return counter; > } > public void setCounter(Integer counter) { > this.counter = counter; > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org