[ 
https://issues.apache.org/jira/browse/SQOOP-1168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14269746#comment-14269746
 ] 

Veena Basavaraj commented on SQOOP-1168:
----------------------------------------

Here are the meeting notes 

Time : Jan 7th 2.00 - 3.30

Meeting minutes( TLDR;) but I had to be thorough since the devil is in the 
details:)

Attendees: Anand, Abe, Gwen, Jarcec

Overview of the design presented by Veena in the meeting. 
The 3 main points presented were

1. How connectors will indicate the parameters/ inputs for the delta fetch (in 
the From part) and parameters/inputs for delta merge (in the To part). ?
The details are laid out in the design wiki on the exact syntax of the config 
class and inputs. Each direction in the connector ( FROM/TO)  can be configured 
to support delta update independently so this allows a connector to support 
reading delta records (new and/or old records that changed) in either one of 
the direction and iteratively add support for the other later. a JDBC connector 
can support delta reading at one point and later on add support for delta 
writes.

2. How the user will be presented with the new configs both from command line 
and rest api. ?
In nutshell it will use the same technique we have today to collect the 
fromJob, toJob and driver configs when creating a job. The semantic of job 
creation from  the command line will still remain the same as we have today 
create job -from 1 -to 2,

The "from" and "to" in the above command line represent the source of the data 
to be read from and destination to which the data will be written to.
The user will be presented with the inputs to fill out for the "FROM" part 
identified by id 1 and the "TO"  part identified by the id 2. 

If the FROM part exposes a delta fetch/read config, it will be presented to the 
user, if the user fills out the values, those values will be passed to the FROM 
part of the connector and the connector can take action in the EXTRACT/READ 
part of the sqoop job lifecycle. 

sqoop:000> create job -f 1 -t 2
 Creating job for links with from id 1 and to id 2
 Please fill following values to create new job object
 Name: Sqoopy

 FromJob configuration

  Schema name:(Required)sqoop
  Table name:(Required)sqoop
  Table SQL statement:(Optional)
  
 Delta Fetch Configuration
   delta query:(Optional): id > 40

Similarly for the TO part if the corresponding connector identified by the "TO" 
id exposes a delta config, it will be presented, user can choose to input the 
values or ignore it, and the connector will be given these values and the 
connector can do what it needs to do in the TO/load part of the job lifecycle

3. The last part is to provide a generic way to store any relevant information/ 
data generated in the job lifecyle that might be relevant in the next job run. 
In a normal full scan read and write, there is really no need for state to be 
stored across runs. every run starts form a clean slate. In case of delta 
updates ( either delta fetch or delta merge), it is highly likely that a 
connector wants to store information from the previous run to be effectively 
used in the next run. For instance a simple example, a JDBC connector supports 
delta fetch/read of new records and it can store a marker saying the last 
record that was read so far, in the next run, the user may not be required to 
indicate this marker, the connector can infer from itself. But if the user 
provides a value, that value will be used instead to make sure the user input 
is always given the priority. Hence we need to a place to store this state 
information connector generates and ideally this should be part of the job run 
history, which in sqoop is held in the submission table. The proposal is to 
store this value in another table ( KEY / VALUE/ TYPE/ CONNECTOR-VERSION) to 
identify what state was generated in every run. Lastly if the requirement is to 
allow users to edit the config inputs and/or the STATE information, we will 
provide an API for it and a utility via the command line as well. 


Discussion Points in the Meeting


Question 1. Do we need to explicitly state it is append or last modified like 
we did in sqoop1, so that users from sqoop1 are not confused? (Question asked 
by Jarcec)
Veena : Sqoop2 is generic and it is much more than sqoop1, it is any-any and 
not SQL to HADOOP anymore. Second, there are 2 parts to the Delta update, delta 
fetch/ read and delta write/merge. Append is a name relevant to writing and not 
reading, and last modified is mostly relevant in the JDBC like sources. A 
source such as kafka has no concept of last modified. Hence the goal is to keep 
it generic and allow every connector to provide its naming for the parameters 
as it suits best.

So a kafka connector, might ask which partition to read from in the Delta 
config ( so that it is not scanning every partition at a time), a FTP connector 
may give a pattern saying it wants to only read the *.jpg files.  On the TO 
side. the kafka connector can define it own set of terminologies on how records 
will be written, whether it will create a new partition or go and add it to a 
existing partition. Not to forget, we do not even handle how deleted records 
will be handled in these updates. but a connector is now able to ask inputs on 
each of these independently without involving any sqoop changes. Hence the 
design is generic and extensible to any connector added in future

STATUS: Resolved


Question 2. Do we need to explicitly distinguish between "new records" and 
"existing records"  when we create a job? (Question asked by Jarcec)

More aptly(rephrased), it is between "only new" and "both new and old" ( since 
it is really hard to say only the existing records will be allowed in a delta 
update, it can be a new or old record whose values changed)

Veena: The way I see it, FROM and TO are 2 parts of the job,
the semantic of the command line:
create job -from 1 -to 2,

The "from" and "to" in the above command line represent the source of the data 
to be read from and destination to which the data will be written to.

The user then fills out what/ how are the records to be read and what/how the 
records are to be written.

So lets take some scenarios.

FROM: JDBC and supports delta fetch, i.e it supports a config to tell what 
records are to be read, if the users fills those parameters, it can infer what 
records to fetch and how to fetch, the input might be a free form query such as 
"id >40", "name LIKE% foo"

TO: HDFS and supports 3 forms of how to do the writing, all the below are 
presented as configs, like an ENUM 
Option 1. It will create new set of files for the given URI ( assuming it is 
not a URI that exisits before, it might do some validation to ensure that)
Option 2. It will append to the given URI, and as a result it might create dupes
Option 3. It will take the set of records given and overwrite with the new 
values, basically a form of merging and not creating dupes


So having this in mind, when a user says create a job with FROM as JDBC as TO 
as HDFS, few things can happen.

Case 1 : On the FROM side, User says select records where id > 40, so this 
means the user is aware that in the last run all records until 40 have been 
read, on the TO side, the user selects the option 2 for the HDFS config. The 
sqoop job will read the records with id>40 and then write to the HDFS as a 
append. Look good so far.

Case 2 :  On the FROM side, User says select records where id > 40, so this 
means the user is aware that in the last run all records until 40 have been 
read, on the TO side the user selects option 1, so this means a new HDFS URI is 
created from record 40 onwards instead of appending to the exisitng one, I 
would say this is not a issue since user chose to select this option and he/she 
is aware of the action taken.May be they wanted it this way. There is no magic 
going on, the user got what they chose

Case 3 :  On the FROM side, User says select records where id > 40, so this 
means the user is aware that in the last run all records until 40 have been 
read, on the TO side the user selects option 3, so this means the HDFS 
connector will try to meticulously merge with the records exisitng, since it 
does not find the records to merge with it wil fallback to creating a new 
record and appending. So in this case the connector might be spending more time 
than required to do the write, but the user is aware of this when the option 3 
was chosen. In case of JDBC it might be clear that there is ordering by ID, so 
the user can be infact smart to choose the right option for the TO based on the 
FROM query it choose. We can even HINT based on the FROM query chosen that the 
right form of TO  to choose, but it is still upto the user to choose what they 
want.


Case 4: On the FROM side, we can have a query, choose records where "name" has 
"FOO" in it, or "timestamp" since "mar202010" or "partititon_id = 5". In this 
case, the set of records fetched from the JDBC do not have a sequential primary 
key ordering at all, it is set of messages or records read. On the TO side 
again they might choose option 2, so at this point they know that there may be 
dupes created, they infact chose it explicitly. If they chose 3, then HDFS will 
try to merge them.

In some cases the destination / TO may not even support the merge cases, so the 
user is given no option but to just append the delta records with dupes and 
this is is a known thing in sqoop1. So having discussed these above use cases, 
it was clear that it is upto to the user to made sound decisions on how the 
delta fetch and merge will happen in a sqoop job. 

Team decision since Abe/ Gwen/ Anand chimed in as well: It was deicided that we 
can be more explicit and not allow users to choose the options. instead if the 
user on the FROM side choose to read sequential set of records, then force the 
TO to be "append/option 2 as in the above example use of HDFS. This means we 
make the create job semantic explict like the following ( again the NEW and OLD 
names can open to change, we want to have a name that is most intutitve and 
semantically correct for the users considering how sqoop2 is generic, append or 
last modified are no longer relevant as it was in sqoop1)

create job -from 1 -to 2 --type "ADD_NEW_RECORDS"
create job -from 1 -to 2 --type "ADD_NEW_AND OLD RECORDS"

or

Alternatively ( as Veena proposed), make it a config option that every FROM 
part of the connector has to ask ( mandate it)
sqoop:000> create job -f 1 -t 2
 Creating job for links with from id 1 and to id 2
 Please fill following values to create new job object
 Name: Sqoopy

 FromJob configuration

  Schema name:(Required)sqoop
  Table name:(Required)sqoop
  Table SQL statement:(Optional)
  
 Delta Fetch Configuration
   full read
   delta read for new records
   delta read for new and old records

But Veena would still think that this explicit indicator it not as useful as it 
sounds to be. User is smarter to choose what they want as long as it is clearly 
stated as a options.
STATUS: Unresolved

​Question 3: What do we want store for STATE? where to store it? how to store 
it ? how long to store it? Should it be any key/value or should it be stated 
explicitly ( a question asked in JIRA prior to this meeting) - Question from 
Abe/ Jarcec and Gwen

Since there were many questions in this one ​big question, we tackled it one by 
one.

Abe said that we should not store a BLOB, but more typed data, for instance 
store it as INT/ LONG/ STRING so that upgrade is easier. Point accepted by all 
that we may want to store it as typed values for the STATE information 
generated by the FROM and TO parts of the connector. Jarcec also agreed that he 
sees why Veena proposed a generic state table to store additional info since 
the delta writing in certain connectors can get fancy and it would be nice to 
provide a means to store information across these delta update runs for optimal 
run.

Jarcec brought an interesting point as why do we need a STATE table, why dont 
we modify the config table itself. So for example in case of the JDBC example 
that was described earlier, the user will say read all records > 40. so the 
config input will have the value = 40 stored when the job starts. Jarcec's 
suggestion was lets modify this value to 50 when the job finishes since the 
records 40 to 50 were read and written and the last record written was with the 
primary key of 50. 

Pros: No work at all, we just use the config table
Cons: The biggest one we are tampering with what the user entered and changing 
it without his/her knowledge. This means we have no history of what the user 
entered. Across multiple job runs, the user may have entered a value, but if 
the job failed, we may have his/her value and if the job succeeded we may 
another value that sqoop wrote. At any point is it is hard to infer who changed 
this value and what the source of truth is.

Anand/ Veena/ Gwen : agreed that tampering with the input given by a user will 
lead to confusion and even harder to debug issues
Jarcec: Still thinks it is not concern and wanted to explore how other systems 
like HIVE/ OOzie do it
Veena : It is important to understand what a user wants and how the user will 
be left less confused rather than mimick what HIVE or oozie or sqoop1 did. 

STATUS: Unresolved

Question ​4: What is the scope of the SQOOP-1168 ticket? ( From Veena)
​Since the overall design pushes a lot of functionality to the connectors, the 
current goal is a POC in JDBC connector and HBase( via Kite). HDFS and Hive and 
other data sources are independent work items as of now and the exit criteria 
for 1168 needs to be clearly defined​

STATUS: Unresolved

Further meeting will be held to resolve some of the unresolved issues. 


> Sqoop2: Delta Fetch/ Merge ( formerly called Incremental Import )
> -----------------------------------------------------------------
>
>                 Key: SQOOP-1168
>                 URL: https://issues.apache.org/jira/browse/SQOOP-1168
>             Project: Sqoop
>          Issue Type: Bug
>            Reporter: Hari Shreedharan
>            Assignee: Veena Basavaraj
>             Fix For: 1.99.5
>
>
> The formal design wiki is here 
> https://cwiki.apache.org/confluence/display/SQOOP/Delta+Fetch+and+Merge+Design



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to