From 86fd522d40eefc6320ee4f963404454855d5dcfc Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Fri, 9 Jun 2017 23:41:51 +0300
Subject: [PATCH 1/2] Allow ignoring some errors during COPY FROM

---
 contrib/file_fdw/file_fdw.c |   4 +-
 src/backend/commands/copy.c | 294 ++++++++++++++++++++++++++------------------
 src/include/commands/copy.h |   2 +-
 3 files changed, 178 insertions(+), 122 deletions(-)

diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 277639f6e9..1d855ca6e7 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -689,7 +689,7 @@ fileIterateForeignScan(ForeignScanState *node)
 {
 	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
-	bool		found;
+	int		found;
 	ErrorContextCallback errcallback;
 
 	/* Set up callback to identify error line number. */
@@ -1080,7 +1080,7 @@ file_acquire_sample_rows(Relation onerel, int elevel,
 	TupleDesc	tupDesc;
 	Datum	   *values;
 	bool	   *nulls;
-	bool		found;
+	int		    found;
 	char	   *filename;
 	bool		is_program;
 	List	   *options;
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 0a33c40c17..d40d753c47 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -54,6 +54,16 @@
 #define OCTVALUE(c) ((c) - '0')
 
 /*
+    NextCopyFrom states:
+    0 – Error or stop
+    1 – Successfully read data
+    2 – Data with errors, skip
+*/
+#define NCF_STOP         0
+#define NCF_SUCCESS      1
+#define NCF_SKIP         2
+
+/*
  * Represents the different source/dest cases we need to worry about at
  * the bottom level
  */
@@ -139,6 +149,7 @@ typedef struct CopyStateData
 	int			cur_lineno;		/* line number for error messages */
 	const char *cur_attname;	/* current att for error messages */
 	const char *cur_attval;		/* current att value for error messages */
+    bool        ignore_errors;  /* ignore errors during COPY FROM */
 
 	/*
 	 * Working state for COPY TO/FROM
@@ -2310,6 +2321,7 @@ CopyFrom(CopyState cstate)
 	bool		useHeapMultiInsert;
 	int			nBufferedTuples = 0;
 	int			prev_leaf_part_index = -1;
+    int         next_cf_state; /* NextCopyFrom return state */
 
 #define MAX_BUFFERED_TUPLES 1000
 	HeapTuple  *bufferedTuples = NULL;	/* initialize to silence warning */
@@ -2519,116 +2531,125 @@ CopyFrom(CopyState cstate)
 		/* Switch into its memory context */
 		MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 
-		if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
-			break;
-
-		/* And now we can form the input tuple. */
-		tuple = heap_form_tuple(tupDesc, values, nulls);
-
-		if (loaded_oid != InvalidOid)
-			HeapTupleSetOid(tuple, loaded_oid);
-
-		/*
-		 * Constraints might reference the tableoid column, so initialize
-		 * t_tableOid before evaluating them.
-		 */
-		tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
-
-		/* Triggers and stuff need to be invoked in query context. */
-		MemoryContextSwitchTo(oldcontext);
-
-		/* Place tuple in tuple slot --- but slot shouldn't free it */
-		slot = myslot;
-		ExecStoreTuple(tuple, slot, InvalidBuffer, false);
-
-		/* Determine the partition to heap_insert the tuple into */
-		if (cstate->partition_dispatch_info)
-		{
-			int			leaf_part_index;
-			TupleConversionMap *map;
-
-			/*
-			 * Away we go ... If we end up not finding a partition after all,
-			 * ExecFindPartition() does not return and errors out instead.
-			 * Otherwise, the returned value is to be used as an index into
-			 * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
-			 * will get us the ResultRelInfo and TupleConversionMap for the
-			 * partition, respectively.
-			 */
-			leaf_part_index = ExecFindPartition(resultRelInfo,
-											 cstate->partition_dispatch_info,
-												slot,
-												estate);
-			Assert(leaf_part_index >= 0 &&
-				   leaf_part_index < cstate->num_partitions);
-
-			/*
-			 * If this tuple is mapped to a partition that is not same as the
-			 * previous one, we'd better make the bulk insert mechanism gets a
-			 * new buffer.
-			 */
-			if (prev_leaf_part_index != leaf_part_index)
-			{
-				ReleaseBulkInsertStatePin(bistate);
-				prev_leaf_part_index = leaf_part_index;
-			}
-
-			/*
-			 * Save the old ResultRelInfo and switch to the one corresponding
-			 * to the selected partition.
-			 */
-			saved_resultRelInfo = resultRelInfo;
-			resultRelInfo = cstate->partitions + leaf_part_index;
-
-			/* We do not yet have a way to insert into a foreign partition */
-			if (resultRelInfo->ri_FdwRoutine)
-				ereport(ERROR,
-						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("cannot route inserted tuples to a foreign table")));
-
-			/*
-			 * For ExecInsertIndexTuples() to work on the partition's indexes
-			 */
-			estate->es_result_relation_info = resultRelInfo;
-
-			/*
-			 * We might need to convert from the parent rowtype to the
-			 * partition rowtype.
-			 */
-			map = cstate->partition_tupconv_maps[leaf_part_index];
-			if (map)
-			{
-				Relation	partrel = resultRelInfo->ri_RelationDesc;
-
-				tuple = do_convert_tuple(tuple, map);
-
-				/*
-				 * We must use the partition's tuple descriptor from this
-				 * point on.  Use a dedicated slot from this point on until
-				 * we're finished dealing with the partition.
-				 */
-				slot = cstate->partition_tuple_slot;
-				Assert(slot != NULL);
-				ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
-				ExecStoreTuple(tuple, slot, InvalidBuffer, true);
-			}
+        next_cf_state = NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid);
 
-			tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
-		}
-
-		skip_tuple = false;
-
-		/* BEFORE ROW INSERT Triggers */
-		if (resultRelInfo->ri_TrigDesc &&
-			resultRelInfo->ri_TrigDesc->trig_insert_before_row)
-		{
-			slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
-
-			if (slot == NULL)	/* "do nothing" */
-				skip_tuple = true;
-			else	/* trigger might have changed tuple */
-				tuple = ExecMaterializeSlot(slot);
+		if (!next_cf_state) {
+			break;
 		}
+        else if (next_cf_state == NCF_SUCCESS)
+        {
+    		/* And now we can form the input tuple. */
+    		tuple = heap_form_tuple(tupDesc, values, nulls);
+
+    		if (loaded_oid != InvalidOid)
+    			HeapTupleSetOid(tuple, loaded_oid);
+
+    		/*
+    		 * Constraints might reference the tableoid column, so initialize
+    		 * t_tableOid before evaluating them.
+    		 */
+    		tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+    		/* Triggers and stuff need to be invoked in query context. */
+    		MemoryContextSwitchTo(oldcontext);
+
+    		/* Place tuple in tuple slot --- but slot shouldn't free it */
+    		slot = myslot;
+    		ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+
+    		/* Determine the partition to heap_insert the tuple into */
+    		if (cstate->partition_dispatch_info)
+    		{
+    			int			leaf_part_index;
+    			TupleConversionMap *map;
+
+    			/*
+    			 * Away we go ... If we end up not finding a partition after all,
+    			 * ExecFindPartition() does not return and errors out instead.
+    			 * Otherwise, the returned value is to be used as an index into
+    			 * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
+    			 * will get us the ResultRelInfo and TupleConversionMap for the
+    			 * partition, respectively.
+    			 */
+    			leaf_part_index = ExecFindPartition(resultRelInfo,
+    											 cstate->partition_dispatch_info,
+    												slot,
+    												estate);
+    			Assert(leaf_part_index >= 0 &&
+    				   leaf_part_index < cstate->num_partitions);
+
+    			/*
+    			 * If this tuple is mapped to a partition that is not same as the
+    			 * previous one, we'd better make the bulk insert mechanism gets a
+    			 * new buffer.
+    			 */
+    			if (prev_leaf_part_index != leaf_part_index)
+    			{
+    				ReleaseBulkInsertStatePin(bistate);
+    				prev_leaf_part_index = leaf_part_index;
+    			}
+
+    			/*
+    			 * Save the old ResultRelInfo and switch to the one corresponding
+    			 * to the selected partition.
+    			 */
+    			saved_resultRelInfo = resultRelInfo;
+    			resultRelInfo = cstate->partitions + leaf_part_index;
+
+    			/* We do not yet have a way to insert into a foreign partition */
+    			if (resultRelInfo->ri_FdwRoutine)
+    				ereport(ERROR,
+    						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+    				 errmsg("cannot route inserted tuples to a foreign table")));
+
+    			/*
+    			 * For ExecInsertIndexTuples() to work on the partition's indexes
+    			 */
+    			estate->es_result_relation_info = resultRelInfo;
+
+    			/*
+    			 * We might need to convert from the parent rowtype to the
+    			 * partition rowtype.
+    			 */
+    			map = cstate->partition_tupconv_maps[leaf_part_index];
+    			if (map)
+    			{
+    				Relation	partrel = resultRelInfo->ri_RelationDesc;
+
+    				tuple = do_convert_tuple(tuple, map);
+
+    				/*
+    				 * We must use the partition's tuple descriptor from this
+    				 * point on.  Use a dedicated slot from this point on until
+    				 * we're finished dealing with the partition.
+    				 */
+    				slot = cstate->partition_tuple_slot;
+    				Assert(slot != NULL);
+    				ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
+    				ExecStoreTuple(tuple, slot, InvalidBuffer, true);
+    			}
+
+    			tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+    		}
+
+    		skip_tuple = false;
+
+    		/* BEFORE ROW INSERT Triggers */
+    		if (resultRelInfo->ri_TrigDesc &&
+    			resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+    		{
+    			slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
+
+    			if (slot == NULL)	/* "do nothing" */
+    				skip_tuple = true;
+    			else	/* trigger might have changed tuple */
+    				tuple = ExecMaterializeSlot(slot);
+    		}
+        }
+        else
+        {
+            skip_tuple = true;
+        }
 
 		if (!skip_tuple)
 		{
@@ -2925,6 +2946,7 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->cur_lineno = 0;
 	cstate->cur_attname = NULL;
 	cstate->cur_attval = NULL;
+    cstate->ignore_errors = true;
 
 	/* Set up variables to avoid per-attribute overhead. */
 	initStringInfo(&cstate->attribute_buf);
@@ -3202,7 +3224,7 @@ NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
  * relation passed to BeginCopyFrom. This function fills the arrays.
  * Oid of the tuple is returned with 'tupleOid' separately.
  */
-bool
+int
 NextCopyFrom(CopyState cstate, ExprContext *econtext,
 			 Datum *values, bool *nulls, Oid *tupleOid)
 {
@@ -3220,11 +3242,20 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 	int		   *defmap = cstate->defmap;
 	ExprState **defexprs = cstate->defexprs;
 
+    int         error_level = ERROR;        /* Error level for COPY FROM input data errors */
+    int         exec_state = NCF_SUCCESS;   /* Return code */
+
 	tupDesc = RelationGetDescr(cstate->rel);
 	attr = tupDesc->attrs;
 	num_phys_attrs = tupDesc->natts;
 	attr_count = list_length(cstate->attnumlist);
 	nfields = file_has_oids ? (attr_count + 1) : attr_count;
+    
+    /* Set error level to WARNING, if errors handling is turned on */
+    if (cstate->ignore_errors)
+    {
+        error_level = WARNING;
+    }
 
 	/* Initialize all values for row to NULL */
 	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
@@ -3240,13 +3271,17 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 
 		/* read raw fields in the next line */
 		if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
-			return false;
+			return NCF_STOP;
 
 		/* check for overflowing fields */
 		if (nfields > 0 && fldct > nfields)
-			ereport(ERROR,
+        {
+            exec_state = NCF_SKIP;
+			ereport(error_level,
 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 					 errmsg("extra data after last expected column")));
+        }
+
 
 		fieldno = 0;
 
@@ -3254,15 +3289,22 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 		if (file_has_oids)
 		{
 			if (fieldno >= fldct)
-				ereport(ERROR,
+            {
+                exec_state = NCF_SKIP;
+				ereport(error_level,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 						 errmsg("missing data for OID column")));
+            }
+
 			string = field_strings[fieldno++];
 
 			if (string == NULL)
-				ereport(ERROR,
+            {
+                exec_state = NCF_SKIP;
+				ereport(error_level,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 						 errmsg("null OID in COPY data")));
+            }
 			else if (cstate->oids && tupleOid != NULL)
 			{
 				cstate->cur_attname = "oid";
@@ -3270,9 +3312,13 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 				*tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
 												   CStringGetDatum(string)));
 				if (*tupleOid == InvalidOid)
-					ereport(ERROR,
+                {
+                    exec_state = NCF_SKIP;
+					ereport(error_level,
 							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 							 errmsg("invalid OID in COPY data")));
+                }
+
 				cstate->cur_attname = NULL;
 				cstate->cur_attval = NULL;
 			}
@@ -3285,10 +3331,14 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 			int			m = attnum - 1;
 
 			if (fieldno >= fldct)
-				ereport(ERROR,
+            {
+                exec_state = NCF_SKIP;
+				ereport(error_level,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 						 errmsg("missing data for column \"%s\"",
 								NameStr(attr[m]->attname))));
+            }
+
 			string = field_strings[fieldno++];
 
 			if (cstate->convert_select_flags &&
@@ -3371,14 +3421,17 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 				ereport(ERROR,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 						 errmsg("received copy data after EOF marker")));
-			return false;
+			return NCF_STOP;
 		}
 
 		if (fld_count != attr_count)
-			ereport(ERROR,
+        {
+            exec_state = NCF_SKIP;
+			ereport(error_level,
 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 					 errmsg("row field count is %d, expected %d",
 							(int) fld_count, attr_count)));
+        }
 
 		if (file_has_oids)
 		{
@@ -3393,9 +3446,12 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 														 -1,
 														 &isnull));
 			if (isnull || loaded_oid == InvalidOid)
-				ereport(ERROR,
+            {
+                exec_state = NCF_SKIP;
+				ereport(error_level,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
 						 errmsg("invalid OID in COPY data")));
+            }
 			cstate->cur_attname = NULL;
 			if (cstate->oids && tupleOid != NULL)
 				*tupleOid = loaded_oid;
@@ -3437,7 +3493,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 										 &nulls[defmap[i]]);
 	}
 
-	return true;
+	return exec_state;
 }
 
 /*
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index f081f2219f..c4d38dbea7 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -31,7 +31,7 @@ extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_fro
 extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename,
 			  bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options);
 extern void EndCopyFrom(CopyState cstate);
-extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
+extern int NextCopyFrom(CopyState cstate, ExprContext *econtext,
 			 Datum *values, bool *nulls, Oid *tupleOid);
 extern bool NextCopyFromRawFields(CopyState cstate,
 					  char ***fields, int *nfields);
-- 
2.11.0


From 7acc7b951c75f7e307f176d5ad991abe21592eb4 Mon Sep 17 00:00:00 2001
From: Alex K <alex.lumir@gmail.com>
Date: Sun, 11 Jun 2017 16:48:37 +0300
Subject: [PATCH 2/2] Report line number on each ERROR/WARNING

---
 src/backend/commands/copy.c | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index d40d753c47..ed2cc87c8a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -3279,7 +3279,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
             exec_state = NCF_SKIP;
 			ereport(error_level,
 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("extra data after last expected column")));
+					 errmsg("extra data after last expected column at line %d", cstate->cur_lineno)));
         }
 
 
@@ -3293,7 +3293,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
                 exec_state = NCF_SKIP;
 				ereport(error_level,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("missing data for OID column")));
+						 errmsg("missing data for OID column at line %d", cstate->cur_lineno)));
             }
 
 			string = field_strings[fieldno++];
@@ -3303,7 +3303,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
                 exec_state = NCF_SKIP;
 				ereport(error_level,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("null OID in COPY data")));
+						 errmsg("null OID in COPY data at line %d", cstate->cur_lineno)));
             }
 			else if (cstate->oids && tupleOid != NULL)
 			{
@@ -3316,7 +3316,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
                     exec_state = NCF_SKIP;
 					ereport(error_level,
 							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-							 errmsg("invalid OID in COPY data")));
+							 errmsg("invalid OID in COPY data at line %d", cstate->cur_lineno)));
                 }
 
 				cstate->cur_attname = NULL;
@@ -3335,8 +3335,8 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
                 exec_state = NCF_SKIP;
 				ereport(error_level,
 						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("missing data for column \"%s\"",
-								NameStr(attr[m]->attname))));
+						 errmsg("missing data for column \"%s\" at line %d",
+								NameStr(attr[m]->attname), cstate->cur_lineno)));
             }
 
 			string = field_strings[fieldno++];
-- 
2.11.0

