phet commented on a change in pull request #3438:
URL: https://github.com/apache/gobblin/pull/3438#discussion_r763673923



##########
File path: 
gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/BaseJdbcBufferedInserter.java
##########
@@ -65,6 +67,9 @@
   protected PreparedStatement insertPstmtForFixedBatch;
   private final Retryer<Boolean> retryer;
 
+  // If this config is true, the inserter can insert duplicate primary records 
according to the specific language
+  protected final boolean replaceExistingValues;

Review comment:
       since this field isn't referenced within this base class, one could 
argue its definition doesn't belong here.
   
   (...unless, you identify logic common to all derivations that could be 
*encapsulated* within the base class--then situating here would make sense.)

##########
File path: 
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
##########
@@ -216,6 +216,11 @@
   public static final String QUEUED_TASK_TIME_MAX_AGE = 
"taskexecutor.queued_task_time.history.max_age";
   public static final long DEFAULT_QUEUED_TASK_TIME_MAX_AGE = 
TimeUnit.HOURS.toMillis(1);
 
+  /**
+   * Optional property to specify whether existing data in databases can be 
overwritten during ingestion jobs
+   */
+  public static final String ALLOW_DATA_OVERWRITE = "allow.data.overwrite";

Review comment:
       since the javadoc elaborates beyond what the name conveys, the latter 
may not adequately discern.  this is about database-writing specifically (not, 
say, HDFS).  should we even consider it JDBC specific?  some ideas:
   `allow.database.record.overwrite`, `allow.jdbc[.record].overwrite`.
   I'm unsure on the best name, just noting how general the current name reads 
(compared to how it's implemented and intended).

##########
File path: 
gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/MySqlBufferedInserterTest.java
##########
@@ -63,7 +93,7 @@ public void testMySqlBufferedInsert() throws SQLException {
     }
     inserter.flush();
 
-    verify(conn, times(2)).prepareStatement(anyString());
+    verify(conn, times(2)).prepareStatement(matches("REPLACE INTO .*"));

Review comment:
       nice--clear and direct.  could we do similar for the 
`MySqlWriterCommands`?  (does it have a unit test yet for you to build on?)

##########
File path: 
gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlBufferedInserter.java
##########
@@ -84,6 +93,26 @@ protected void initializeBatch(String databaseName, String 
table)
           + " due to # of params limitation " + this.maxParamSize + " , # of 
columns: " + this.columnNames.size());
     }
     this.batchSize = actualBatchSize;
-    super.initializeBatch(databaseName, table);
+
+    // Use separate insertion statement if replacements are allowed
+    if (this.replaceExistingValues) {
+        this.insertStmtPrefix = createReplaceStatementStr(databaseName, table);

Review comment:
       possibly more direct to splice in the possible alternative with an 
override of `createInsertStatementStr` by this derived class.  what do you 
think?
   
   (in general, the fewer reassignments derived classes make to base class 
fields, the better.  its easier to reason about and less prone to error to have 
derived classes compute a value differently, in their own way, than it is to 
consider them potentially taking over responsibility for setting up or 
maintaining state, previously the province of the base.)

##########
File path: 
gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/BaseJdbcBufferedInserter.java
##########
@@ -80,6 +85,8 @@ public BaseJdbcBufferedInserter(State state, Connection conn) 
{
     this.retryer = RetryerBuilder.<Boolean> newBuilder().retryIfException()
         .withWaitStrategy(WaitStrategies.exponentialWait(1000, maxWait, 
TimeUnit.SECONDS))
         
.withStopStrategy(StopStrategies.stopAfterAttempt(maxAttempts)).build();
+
+    this.replaceExistingValues = 
state.getPropAsBoolean(ConfigurationKeys.ALLOW_DATA_OVERWRITE);

Review comment:
       `JdbcWriterCommandsFactory` already fails-fast on unsupported 
`Destination`.  how about adding there (specifically inform each 
`*WriterCommands` via a third ctor param, so the specific type may fail if not 
up to the task)?  the `*BufferedInserter`s are only constructed by the 
`*WriterCommands`, so a check there should cover both cases.
   
   the factory would read the config and thereafter a `boolean` is passed to 
the ctors (and beyond).

##########
File path: 
gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlWriterCommands.java
##########
@@ -145,7 +149,13 @@ public void drop(String database, String table) throws 
SQLException {
 
   @Override
   public void copyTable(String databaseName, String from, String to) throws 
SQLException {
-    String sql = String.format(COPY_INSERT_STATEMENT_FORMAT, databaseName, to, 
databaseName, from);
+    // Chooses between INSERT and REPLACE logic based on the job configurations
+    String sql;
+    if (this.replaceExistingValues) {
+      sql = String.format(COPY_REPLACE_STATEMENT_FORMAT, databaseName, to, 
databaseName, from);
+    } else {
+      sql = String.format(COPY_INSERT_STATEMENT_FORMAT, databaseName, to, 
databaseName, from);
+    }

Review comment:
       minor: the ternary op `c ? x : y` is nice for avoiding the need to 
initialize empty.  here it could even be tucked within the first arg expr to 
`String.format`.

##########
File path: 
gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/MySqlBufferedInserter.java
##########
@@ -34,12 +34,21 @@
  * The implementation of JdbcBufferedInserter for MySQL.
  * This purpose of buffered insert is mainly for performance reason and the 
implementation is based on the
  * reference manual http://dev.mysql.com/doc/refman/5.0/en/insert-speed.html
+ *
+ * This class supports two types of insertions for MySQL 1) standard insertion 
- only supports records with unique
+ * primary keys and fails on attempted insertion of a duplicate record 2) 
replace insertion - inserts new records as
+ * normal but allows for value overwrites for duplicate inserts (by primary 
key)
+ *
+ * Note that replacement occurs at 'record-level', so if there are duplicates 
in the same input then they will replace
+ * each other in a non-deterministic order.

Review comment:
       nice docs!  perhaps check the `insert-speed` link to make sure it still 
makes sense.  if so, update to mysql 8 link.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to