[ https://issues.apache.org/jira/browse/PHOENIX-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080567#comment-16080567 ]
James Taylor commented on PHOENIX-3999: --------------------------------------- Why the limitation of not pull over columns from the RHS, [~maryannxue]? > Optimize inner joins as SKIP-SCAN-JOIN when possible > ---------------------------------------------------- > > Key: PHOENIX-3999 > URL: https://issues.apache.org/jira/browse/PHOENIX-3999 > Project: Phoenix > Issue Type: Bug > Reporter: James Taylor > > Semi joins on the leading part of the primary key end up doing batches of > point queries (as opposed to a broadcast hash join), however inner joins do > not. > Here's a set of example schemas that executes a skip scan on the inner query: > {code} > CREATE TABLE COMPLETED_BATCHES ( > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > BATCH_ID BIGINT NOT NULL, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_SEQUENCE_NUM, > BATCH_ID > ) > ); > CREATE TABLE ITEMS ( > BATCH_ID BIGINT NOT NULL, > ITEM_ID BIGINT NOT NULL, > ITEM_TYPE BIGINT, > ITEM_VALUE VARCHAR, > CONSTRAINT PK PRIMARY KEY > ( > BATCH_ID, > ITEM_ID > ) > ); > CREATE TABLE COMPLETED_ITEMS ( > ITEM_TYPE BIGINT NOT NULL, > BATCH_SEQUENCE_NUM BIGINT NOT NULL, > ITEM_ID BIGINT NOT NULL, > ITEM_VALUE VARCHAR, > CONSTRAINT PK PRIMARY KEY > ( > ITEM_TYPE, > BATCH_SEQUENCE_NUM, > ITEM_ID > ) > ); > {code} > The explain plan of these indicate that a dynamic filter will be performed > like this: > {code} > UPSERT SELECT > CLIENT PARALLEL 1-WAY FULL SCAN OVER ITEMS > SKIP-SCAN-JOIN TABLE 0 > CLIENT PARALLEL 1-WAY RANGE SCAN OVER COMPLETED_BATCHES [1] - [2] > SERVER FILTER BY FIRST KEY ONLY > SERVER AGGREGATE INTO DISTINCT ROWS BY [BATCH_ID] > CLIENT MERGE SORT > DYNAMIC SERVER FILTER BY I.BATCH_ID IN ($8.$9) > {code} > We should also be able to leverage this optimization when an inner join is > used such as this: > {code} > UPSERT INTO COMPLETED_ITEMS (ITEM_TYPE, BATCH_SEQUENCE_NUM, ITEM_ID, > ITEM_VALUE) > SELECT i.ITEM_TYPE, b.BATCH_SEQUENCE_NUM, i.ITEM_ID, i.ITEM_VALUE > FROM ITEMS i, COMPLETED_BATCHES b > WHERE b.BATCH_ID = i.BATCH_ID AND > b.BATCH_SEQUENCE_NUM > 1000 AND b.BATCH_SEQUENCE_NUM < 2000; > {code} > A complete unit test looks like this: > {code} > @Test > public void testNestedLoopJoin() throws Exception { > try (Connection conn = DriverManager.getConnection(getUrl())) { > String t1="COMPLETED_BATCHES"; > String ddl1 = "CREATE TABLE " + t1 + " (\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " BATCH_SEQUENCE_NUM,\n" + > " BATCH_ID\n" + > " )\n" + > ")" + > ""; > conn.createStatement().execute(ddl1); > > String t2="ITEMS"; > String ddl2 = "CREATE TABLE " + t2 + " (\n" + > " BATCH_ID BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_TYPE BIGINT,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " BATCH_ID,\n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl2); > String t3="COMPLETED_ITEMS"; > String ddl3 = "CREATE TABLE " + t3 + "(\n" + > " ITEM_TYPE BIGINT NOT NULL,\n" + > " BATCH_SEQUENCE_NUM BIGINT NOT NULL,\n" + > " ITEM_ID BIGINT NOT NULL,\n" + > " ITEM_VALUE VARCHAR,\n" + > " CONSTRAINT PK PRIMARY KEY\n" + > " (\n" + > " ITEM_TYPE,\n" + > " BATCH_SEQUENCE_NUM, \n" + > " ITEM_ID\n" + > " )\n" + > ")"; > conn.createStatement().execute(ddl3); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,2)"); > conn.createStatement().execute("UPSERT INTO > "+t1+"(BATCH_SEQUENCE_NUM, batch_id) VALUES (1,4)"); > conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, > item_id, item_type, item_value) VALUES (1,100, 10, 'a')"); > conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, > item_id, item_type, item_value) VALUES (2,200, 20, 'a')"); > conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, > item_id, item_type, item_value) VALUES (3,300, 10, 'a')"); > conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, > item_id, item_type, item_value) VALUES (4,400, 20, 'a')"); > conn.createStatement().execute("UPSERT INTO "+t2+"(batch_id, > item_id, item_type, item_value) VALUES (5,500, 10, 'a')"); > conn.commit(); > > conn.setAutoCommit(true); > String dml = "UPSERT INTO " + t3 + " (ITEM_TYPE, > BATCH_SEQUENCE_NUM, ITEM_ID, ITEM_VALUE)\n" + > "SELECT ITEM_TYPE, 1, ITEM_ID, ITEM_VALUE \n" + > "FROM " + t2 + " i\n" + > "WHERE EXISTS (" + > " SELECT 1 FROM " + t1 + " b WHERE b.BATCH_ID = > i.BATCH_ID AND " + > " b.BATCH_SEQUENCE_NUM > 0 AND b.BATCH_SEQUENCE_NUM < > 2)"; > conn.createStatement().execute(dml); > ResultSet rs = conn.createStatement().executeQuery("SELECT > ITEM_ID FROM " + t3); > assertTrue(rs.next()); > assertEquals(rs.getLong(1), 200L); > assertTrue(rs.next()); > assertEquals(rs.getLong(1), 400L); > assertFalse(rs.next()); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)